From: dan Date: Wed, 7 Jun 2017 15:55:59 +0000 (+0000) Subject: Add too/tserver.c - the implementation of a simple multi-threaded server X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=538ed8e376e5626289a30cf7ab35eb9fe5104906;p=thirdparty%2Fsqlite.git Add too/tserver.c - the implementation of a simple multi-threaded server designed for interactive testing of concurrency between connections used by different threads of the same process. FossilOrigin-Name: 05b4fc4340011847c8585bb822d339dd7b8351266a12b26fdc85edce38fc9dd3 --- diff --git a/manifest b/manifest index 5084cb38c3..a9ab99982a 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Add\scode\sto\sthis\sbranch\sto\semit\sa\slog\smessage\safter\seach\scumulative\ssecond\nthat\sthe\sWRITER\slock\shas\sbeen\sheld. -D 2017-05-22T08:01:58.394 +C Add\stoo/tserver.c\s-\sthe\simplementation\sof\sa\ssimple\smulti-threaded\sserver\ndesigned\sfor\sinteractive\stesting\sof\sconcurrency\sbetween\sconnections\sused\sby\ndifferent\sthreads\sof\sthe\ssame\sprocess. +D 2017-06-07T15:55:59.971 F Makefile.in 1cc758ce3374a32425e4d130c2fe7b026b20de5b8843243de75f087c0a2661fb F Makefile.linux-gcc 7bc79876b875010e8c8f9502eb935ca92aa3c434 F Makefile.msc 6a8c838220f7c00820e1fc0ac1bccaaa8e5676067e1dbfa1bafa7a4ffecf8ae6 @@ -1557,6 +1557,7 @@ F tool/stack_usage.tcl f8e71b92cdb099a147dad572375595eae55eca43 F tool/symbols-mingw.sh 4dbcea7e74768305384c9fd2ed2b41bbf9f0414d F tool/symbols.sh c5a617b8c61a0926747a56c65f5671ef8ac0e148 F tool/tostr.tcl 96022f35ada2194f6f8ccf6fd95809e90ed277c4 +F tool/tserver.c 3b2dc1be6ba6d942d2988a86983a9142d2a95a105ce17ef4e32871cbb213b0ca F tool/varint.c 5d94cb5003db9dbbcbcc5df08d66f16071aee003 F tool/vdbe-compress.tcl 5926c71f9c12d2ab73ef35c29376e756eb68361c F tool/vdbe_profile.tcl 246d0da094856d72d2c12efec03250d71639d19f @@ -1584,7 +1585,7 @@ F vsixtest/vsixtest.tcl 6a9a6ab600c25a91a7acc6293828957a386a8a93 F vsixtest/vsixtest.vcxproj.data 2ed517e100c66dc455b492e1a33350c1b20fbcdc F vsixtest/vsixtest.vcxproj.filters 37e51ffedcdb064aad6ff33b6148725226cd608e F vsixtest/vsixtest_TemporaryKey.pfx e5b1b036facdb453873e7084e1cae9102ccc67a0 -P 9b7f80246f2b9921483ab23457865e783ee70b93f67bcecc0c16516447a05875 -R 9e4884a58e7a99008cd2855aaa793d84 +P a726d98122704e1e702cfa7ae61d680497df6a826d98082161e0823e115d40a5 +R d51fab1f343a3ef16d87bb149082f9d8 U dan -Z 7ef13cf9ec812325a52c181aa1146b6a +Z d72926023206a1ab0336fd84f20b8227 diff --git a/manifest.uuid b/manifest.uuid index 2f9dffabf7..735223e667 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -a726d98122704e1e702cfa7ae61d680497df6a826d98082161e0823e115d40a5 \ No newline at end of file +05b4fc4340011847c8585bb822d339dd7b8351266a12b26fdc85edce38fc9dd3 \ No newline at end of file diff --git a/tool/tserver.c b/tool/tserver.c new file mode 100644 index 0000000000..8c2e26c412 --- /dev/null +++ b/tool/tserver.c @@ -0,0 +1,431 @@ +/* +** 2017 June 7 +** +** The author disclaims copyright to this source code. In place of +** a legal notice, here is a blessing: +** +** May you do good and not evil. +** May you find forgiveness for yourself and forgive others. +** May you share freely, never taking more than you give. +** +************************************************************************* +** +** Simple multi-threaded server used for informal testing of concurrency +** between connections in different threads. Listens for tcp/ip connections +** on port 9999 of the 127.0.0.1 interface only. To build: +** +** gcc -g $(TOP)/tool/tserver.c sqlite3.o -lpthread -o tserver +** +** To run using "x.db" as the db file: +** +** ./tserver x.db +** +** To connect, open a client socket on port 9999 and start sending commands. +** Commands are either SQL - which must be terminated by a semi-colon, or +** dot-commands, which must be terminated by a newline. If an SQL statement +** is seen, it is prepared and added to an internal list. +** +** Dot-commands are: +** +** .list Display all SQL statements in the list. +** .quit Disconnect. +** .run Run all SQL statements in the list. +** .repeats N Configure the number of repeats per ".run". +** .seconds N Configure the number of seconds to ".run" for. +** +** Example input: +** +** BEGIN; +** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); +** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); +** INSERT INTO t1 VALUES(randomblob(10), randomblob(100)); +** COMMIT; +** .repeats 100000 +** .run +** +*/ +#define TSERVER_PORTNUMBER 9999 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sqlite3.h" + +/* Database used by this server */ +static char *zDatabaseName = 0; + +typedef struct ClientCtx ClientCtx; +struct ClientCtx { + sqlite3 *db; /* Database handle for this client */ + int fd; /* Client fd */ + int nRepeat; /* Number of times to repeat SQL */ + int nSecond; /* Number of seconds to run for */ + sqlite3_stmt **apPrepare; /* Array of prepared statements */ + int nPrepare; /* Valid size of apPrepare[] */ + int nAlloc; /* Allocated size of apPrepare[] */ +}; + +static int is_eol(int i){ + return (i=='\n' || i=='\r'); +} +static int is_whitespace(int i){ + return (i==' ' || i=='\t' || is_eol(i)); +} + +static void trim_string(const char **pzStr, int *pnStr){ + const char *zStr = *pzStr; + int nStr = *pnStr; + + while( nStr>0 && is_whitespace(zStr[0]) ){ + zStr++; + nStr--; + } + while( nStr>0 && is_whitespace(zStr[nStr-1]) ){ + nStr--; + } + + *pzStr = zStr; + *pnStr = nStr; +} + +static int send_message(ClientCtx *p, const char *zFmt, ...){ + char *zMsg; + va_list ap; /* Vararg list */ + va_start(ap, zFmt); + int res = -1; + + zMsg = sqlite3_vmprintf(zFmt, ap); + if( zMsg ){ + res = write(p->fd, zMsg, strlen(zMsg)); + } + sqlite3_free(zMsg); + va_end(ap); + + return (res<0); +} + +static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){ + const char *zTail = zSql; + int nTail = nSql; + int rc = SQLITE_OK; + + while( rc==SQLITE_OK ){ + if( p->nPrepare>=p->nAlloc ){ + int nByte = (p->nPrepare+32) * sizeof(sqlite3_stmt*); + sqlite3_stmt **apNew = sqlite3_realloc(p->apPrepare, nByte); + if( apNew ){ + p->apPrepare = apNew; + p->nAlloc = p->nPrepare+32; + }else{ + rc = SQLITE_NOMEM; + break; + } + } + rc = sqlite3_prepare_v2( + p->db, zTail, nTail, &p->apPrepare[p->nPrepare], &zTail + ); + if( rc!=SQLITE_OK ){ + send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); + rc = 1; + break; + } + if( p->apPrepare[p->nPrepare]==0 ){ + break; + } + p->nPrepare++; + nTail = nSql - (zTail-zSql); + rc = send_message(p, "ok (%d SQL statements)\n", p->nPrepare); + } + + return rc; +} + +static sqlite3_int64 get_timer(void){ + struct timeval t; + gettimeofday(&t, 0); + return ((sqlite3_int64)t.tv_usec / 1000) + ((sqlite3_int64)t.tv_sec * 1000); +} + +static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){ + assert( zCmd[0]=='.' ); + int n; + int rc = 0; + const char *z = &zCmd[1]; + const char *zArg; + int nArg; + + for(n=0; n<(nCmd-1); n++){ + if( is_whitespace(z[n]) ) break; + } + + zArg = &z[n]; + nArg = nCmd-n; + trim_string(&zArg, &nArg); + + if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){ + int i; + for(i=0; rc==0 && inPrepare; i++){ + const char *zSql = sqlite3_sql(p->apPrepare[i]); + int nSql = strlen(zSql); + trim_string(&zSql, &nSql); + rc = send_message(p, "%d: %.*s\n", i, nSql, zSql); + } + } + + else if( n>=1 && n<=4 && 0==strncmp(z, "quit", n) ){ + rc = 1; + } + + else if( n>=2 && n<=7 && 0==strncmp(z, "repeats", n) ){ + if( nArg ){ + p->nRepeat = strtol(zArg, 0, 0); + } + rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat); + } + + else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){ + int i, j; + int nBusy = 0; + sqlite3_int64 t0 = get_timer(); + sqlite3_int64 t1 = t0; + int nT1 = 0; + int nTBusy1 = 0; + + for(j=0; (p->nRepeat<=0 || jnRepeat) && rc==SQLITE_OK; j++){ + sqlite3_int64 t2; + + for(i=0; inPrepare && rc==SQLITE_OK; i++){ + sqlite3_stmt *pStmt = p->apPrepare[i]; + + /* Execute the statement */ + while( sqlite3_step(pStmt)==SQLITE_ROW ); + rc = sqlite3_reset(pStmt); + + if( (rc & 0xFF)==SQLITE_BUSY ){ + if( sqlite3_get_autocommit(p->db)==0 ){ + sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0); + } + nBusy++; + rc = SQLITE_OK; + break; + } + else if( rc!=SQLITE_OK ){ + send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); + } + } + + t2 = get_timer(); + if( t2>=(t1+1000) ){ + int nMs = (t2 - t1); + int nDone = (j+1 - nBusy - nT1); + + rc = send_message( + p, "(%d done @ %d per second, %d busy)\n", + nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1 + ); + t1 = t2; + nT1 = j+1 - nBusy; + nTBusy1 = nBusy; + if( p->nSecond>=0 && (p->nSecond*1000)<=t1-t0 ) break; + } + } + + if( rc==SQLITE_OK ){ + send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j); + } + } + + else if( n>=1 && n<=7 && 0==strncmp(z, "seconds", n) ){ + if( nArg ){ + p->nSecond = strtol(zArg, 0, 0); + if( p->nSecond>0 ) p->nRepeat = 0; + } + rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat); + } + + else{ + send_message(p, + "unrecognized dot command: %.*s\n" + "should be \"list\", \"run\", \"repeats\", or \"seconds\"\n", n, z + ); + rc = 1; + } + + return rc; +} + +static void *handle_client(void *pArg){ + char zCmd[32*1024]; /* Read buffer */ + int nCmd = 0; /* Valid bytes in zCmd[] */ + int res; /* Result of read() call */ + int rc = SQLITE_OK; + + ClientCtx ctx; + memset(&ctx, 0, sizeof(ClientCtx)); + + ctx.fd = (int)(intptr_t)pArg; + ctx.nRepeat = 1; + rc = sqlite3_open(zDatabaseName, &ctx.db); + if( rc!=SQLITE_OK ){ + fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db)); + return 0; + } + + while( rc==SQLITE_OK ){ + int i; + int iStart; + int nConsume; + int bQuote = 0; + res = read(ctx.fd, &zCmd[nCmd], sizeof(zCmd)-nCmd-1); + if( res<=0 ) break; + nCmd += res; + if( nCmd>=sizeof(zCmd)-1 ){ + fprintf(stderr, "oversized (>32KiB) message\n"); + res = 0; + break; + } + zCmd[nCmd] = '\0'; + + do { + nConsume = 0; + + /* Gobble up any whitespace */ + iStart = 0; + while( is_whitespace(zCmd[iStart]) ) iStart++; + + if( zCmd[iStart]=='.' ){ + /* This is a dot-command. Search for end-of-line. */ + for(i=iStart; i0 ){ + nCmd = nCmd-nConsume; + if( nCmd>0 ){ + memmove(zCmd, &zCmd[nConsume], nCmd); + } + } + }while( rc==SQLITE_OK && nConsume>0 ); + } + + fprintf(stdout, "Client %d disconnects\n", ctx.fd); + close(ctx.fd); + sqlite3_close(ctx.db); + return 0; +} + +int main(int argc, char *argv[]) { + sqlite3 *db; + int sfd; + int rc; + int yes = 1; + struct sockaddr_in server; + struct sockaddr_in client; + + /* Ignore SIGPIPE. Otherwise the server exits if a client disconnects + ** abruptly. */ + signal(SIGPIPE, SIG_IGN); + + if( argc!=2 ){ + fprintf(stderr, "Usage: %s DATABASE\n", argv[0]); + return 1; + } + zDatabaseName = argv[1]; + + rc = sqlite3_open(zDatabaseName, &db); + if( rc!=SQLITE_OK ){ + fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(db)); + return 1; + } + + rc = sqlite3_exec(db, "SELECT * FROM sqlite_master", 0, 0, 0); + if( rc!=SQLITE_OK ){ + fprintf(stderr, "sqlite3_exec(): %s\n", sqlite3_errmsg(db)); + return 1; + } + + sfd = socket(AF_INET, SOCK_STREAM, 0); + if( sfd<0 ){ + fprintf(stderr, "socket() failed\n"); + return 1; + } + + rc = setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + if( rc<0 ){ + perror("setsockopt"); + return 1; + } + + memset(&server, 0, sizeof(server)); + server.sin_family = AF_INET; + server.sin_addr.s_addr = inet_addr("127.0.0.1"); + server.sin_port = htons(TSERVER_PORTNUMBER); + + rc = bind(sfd, (struct sockaddr *)&server, sizeof(struct sockaddr)); + if( rc<0 ){ + fprintf(stderr, "bind() failed\n"); + return 1; + } + + rc = listen(sfd, 8); + if( rc<0 ){ + fprintf(stderr, "listen() failed\n"); + return 1; + } + + while( 1 ){ + pthread_t tid; + int cfd = accept(sfd, NULL, NULL); + if( cfd<0 ){ + perror("accept()"); + return 1; + } + + fprintf(stdout, "Client %d connects\n", cfd); + rc = pthread_create(&tid, NULL, handle_client, (void*)(intptr_t)cfd); + if( rc!=0 ){ + perror("pthread_create()"); + return 1; + } + + pthread_detach(tid); + } + + return 0; +}