From: robertc <> Date: Mon, 14 Oct 2002 14:16:58 +0000 (+0000) Subject: commloops from Adrian, C++ courtesy Rob X-Git-Tag: SQUID_3_0_PRE1~666 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c4b7a5a9a9712f84f4203155840609cf4a67103f;p=thirdparty%2Fsquid.git commloops from Adrian, C++ courtesy Rob --- diff --git a/src/Makefile.am b/src/Makefile.am index 30dc7bdb95..10f3abcf8e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,7 +1,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.am,v 1.36 2002/10/13 20:34:57 robertc Exp $ +# $Id: Makefile.am,v 1.37 2002/10/14 08:16:58 robertc Exp $ # # Uncomment and customize the following to suit your needs: # @@ -134,6 +134,7 @@ squid_SOURCES = \ clientStream.cc \ clientStream.h \ comm.cc \ + comm.h \ comm_select.cc \ comm_poll.cc \ comm_kqueue.cc \ diff --git a/src/Makefile.in b/src/Makefile.in index 27909d6ef1..e5430b232c 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -17,7 +17,7 @@ # # Makefile for the Squid Object Cache server # -# $Id: Makefile.in,v 1.251 2002/10/13 20:34:57 robertc Exp $ +# $Id: Makefile.in,v 1.252 2002/10/14 08:16:58 robertc Exp $ # # Uncomment and customize the following to suit your needs: # @@ -232,6 +232,7 @@ squid_SOURCES = \ clientStream.cc \ clientStream.h \ comm.cc \ + comm.h \ comm_select.cc \ comm_poll.cc \ comm_kqueue.cc \ diff --git a/src/asn.cc b/src/asn.cc index 594a3be7b8..d144ca7dfd 100644 --- a/src/asn.cc +++ b/src/asn.cc @@ -1,6 +1,6 @@ /* - * $Id: asn.cc,v 1.82 2002/10/13 20:34:57 robertc Exp $ + * $Id: asn.cc,v 1.83 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 53 AS Number handling * AUTHOR: Duane Wessels, Kostas Anagnostakis @@ -232,7 +232,7 @@ asHandleReply(void *data, StoreIOBuffer result) char *buf = asState->reqbuf; int leftoversz = -1; - debug(53, 3) ("asHandleReply: Called with size=%u\n", result.length); + debug(53, 3) ("asHandleReply: Called with size=%u\n", (unsigned int)result.length); debug(53, 3) ("asHandleReply: buffer='%s'\n", buf); /* First figure out whether we should abort the request */ @@ -244,7 +244,7 @@ asHandleReply(void *data, StoreIOBuffer result) asStateFree(asState); return; } else if (result.flags.error) { - debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", result.length); + debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", (unsigned int) result.length); asStateFree(asState); return; } else if (HTTP_OK != e->mem_obj->reply->sline.status) { diff --git a/src/client_side.cc b/src/client_side.cc index 7bd087746f..37c45684a7 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -1,6 +1,6 @@ /* - * $Id: client_side.cc,v 1.599 2002/10/13 20:34:59 robertc Exp $ + * $Id: client_side.cc,v 1.600 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 33 Client-side Routines * AUTHOR: Duane Wessels @@ -60,6 +60,7 @@ #include "IPInterception.h" #include "authenticate.h" #include "Store.h" +#include "comm.h" #if LINGERING_CLOSE @@ -93,6 +94,10 @@ typedef struct _clientSocketContext { struct { int deferred:1; /* This is a pipelined request waiting for the * current object to complete */ + int parsed_ok:1; /* Was this parsed correctly? */ + int mayUseConnection:1; /* This request may use the connection - + * don't read anymore requests for now + */ } flags; struct { clientStreamNode *node; @@ -111,14 +116,14 @@ static clientSocketContext *clientSocketContextNew(clientHttpRequest *); /* other */ static CWCB clientWriteComplete; static CWCB clientWriteBodyComplete; -static PF clientReadRequest; +static IOCB clientReadRequest; static PF connStateFree; static PF requestTimeout; static PF clientLifetimeTimeout; static void checkFailureRatio(err_type, hier_code); static clientSocketContext *parseHttpRequestAbort(ConnStateData * conn, const char *uri); -static clientSocketContext *parseHttpRequest(ConnStateData *, method_t *, int *, +static clientSocketContext *parseHttpRequest(ConnStateData *, method_t *, char **, size_t *); #if USE_IDENT static IDCB clientIdentDone; @@ -142,7 +147,6 @@ static MemObject *clientGetMemObject(clientHttpRequest * http); static void clientPrepareLogWithRequestDetails(request_t *, AccessLogEntry *); static void clientLogRequest(clientHttpRequest *); static void httpRequestFreeResources(clientHttpRequest *); -static void connEmptyOSReadBuffers(int fd); static int connIsUsable(ConnStateData * conn); static clientSocketContext *connGetCurrentContext(ConnStateData * conn); static void contextDeferRecipientForLater(clientSocketContext * context, clientStreamNode * node, HttpReply * rep, StoreIOBuffer recievedData); @@ -160,7 +164,7 @@ static char *skipLeadingSpace(char *aString); static char *findTrailingHTTPVersion(char *uriAndHTTPVersion); static void trimTrailingSpaces(char *aString, size_t len); static clientSocketContext *parseURIandHTTPVersion(char **url_p, http_version_t * http_ver_p, ConnStateData * conn); -static void setLogUri(clientHttpRequest * http, char *uri); +static void setLogUri(clientHttpRequest * http, char const *uri); static void prepareInternalUrl(clientHttpRequest * http, char *url); static void prepareForwardProxyUrl(clientHttpRequest * http, char *url); static void prepareAcceleratedUrl(clientHttpRequest * http, char *url, char *req_hdr); @@ -168,12 +172,12 @@ static int connGetAvailableBufferLength(ConnStateData const *conn); static void connMakeSpaceAvailable(ConnStateData * conn); static void connAddContextToQueue(ConnStateData * conn, clientSocketContext * context); static int connGetConcurrentRequestCount(ConnStateData * conn); -static int connReadWasError(ConnStateData * conn, int size); +static int connReadWasError(ConnStateData * conn, comm_err_t, int size); static int connFinishedWithConn(ConnStateData * conn, int size); static void connNoteUseOfBuffer(ConnStateData * conn, size_t byteCount); static int connKeepReadingIncompleteRequest(ConnStateData * conn); static void connCancelIncompleteRequests(ConnStateData * conn); -static ConnStateData *connStateCreate(struct sockaddr_in peer, struct sockaddr_in me, int fd); +static ConnStateData *connStateCreate(struct sockaddr_in *peer, struct sockaddr_in *me, int fd); static clientStreamNode *getClientReplyContext(clientSocketContext * context); static int connAreAllContextsForThisConnection(ConnStateData * connState); static void connFreeAllContexts(ConnStateData * connState); @@ -191,6 +195,24 @@ getClientReplyContext(clientSocketContext * context) return (clientStreamNode *)context->http->client_stream.tail->prev->data; } +/* + * This routine should be called to grow the inbuf and then + * call comm_read(). + */ +void +clientReadSomeData(ConnStateData *conn) +{ + size_t len; + + debug(33, 4) ("clientReadSomeData: FD %d: reading request...\n", conn->fd); + + connMakeSpaceAvailable(conn); + len = connGetAvailableBufferLength(conn) - 1; + + comm_read(conn->fd, conn->in.buf + conn->in.notYetUsed, len, clientReadRequest, conn); +} + + void clientSocketRemoveThisFromConnectionList(clientSocketContext * context, ConnStateData * conn) @@ -458,16 +480,6 @@ connFreeAllContexts(ConnStateData * connState) } } -void -connEmptyOSReadBuffers(int fd) -{ -#ifdef _SQUID_LINUX_ - /* prevent those nasty RST packets */ - char buf[SQUID_TCP_SO_RCVBUF]; - while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0); -#endif -} - /* This is a handler normally called by comm_close() */ static void connStateFree(int fd, void *data) @@ -485,7 +497,6 @@ connStateFree(int fd, void *data) memFreeBuf(connState->in.allocatedSize, connState->in.buf); pconnHistCount(0, connState->nrequests); cbdataFree(connState); - connEmptyOSReadBuffers(fd); } /* @@ -714,21 +725,12 @@ connReadNextRequest(ConnStateData * conn) */ commSetTimeout(conn->fd, Config.Timeout.persistent_request, requestTimeout, conn); - /* - * CYGWIN has a problem and is blocking on read() requests when there - * is no data present. - * This hack may hit performance a little, but it's better than - * blocking!. - */ conn->defer.until = 0; /* Kick it to read a new request */ -#ifdef _SQUID_CYGWIN_ - commSetSelect(conn->fd, COMM_SELECT_READ, clientReadRequest, conn, 0); -#else - clientReadRequest(conn->fd, conn); /* Read next request */ -#endif - /* - * Note, the FD may be closed at this point. - */ + /* Make sure we're still reading from the client side! */ + /* XXX this could take a bit of CPU time! aiee! -- adrian */ + assert(comm_has_pending_read(conn->fd)); + /* clientReadSomeData(conn); */ + /* Please don't do anything with the FD past here! */ } void @@ -839,7 +841,7 @@ parseHttpRequestAbort(ConnStateData * conn, const char *uri) http->start = current_time; http->req_sz = conn->in.notYetUsed; http->uri = xstrdup(uri); - http->log_uri = xstrndup(uri, MAX_URL); + setLogUri (http, uri); context = clientSocketContextNew(http); tempBuffer.data = context->reqbuf; tempBuffer.length = HTTP_REQBUF_SZ; @@ -958,7 +960,7 @@ clientParseHttpRequestLine(char *inbuf, size_t req_sz, ConnStateData * conn, } void -setLogUri(clientHttpRequest * http, char *uri) +setLogUri(clientHttpRequest * http, char const *uri) { if (!stringHasCntl(uri)) http->log_uri = xstrndup(uri, MAX_URL); @@ -1050,11 +1052,13 @@ prepareAcceleratedUrl(clientHttpRequest * http, char *url, char *req_hdr) * parseHttpRequest() * * Returns - * NULL on error or incomplete request - * a clientHttpRequest structure on success + * NULL on incomplete requests + * a clientSocketContext structure on success or failure. + * Sets result->flags.parsed_ok to 0 if failed to parse the request. + * Sets result->flags.parsed_ok to 1 if we have a good request. */ static clientSocketContext * -parseHttpRequest(ConnStateData * conn, method_t * method_p, int *status, +parseHttpRequest(ConnStateData * conn, method_t * method_p, char **prefix_p, size_t * req_line_sz_p) { char *inbuf = NULL; @@ -1072,11 +1076,9 @@ parseHttpRequest(ConnStateData * conn, method_t * method_p, int *status, /* pre-set these values to make aborting simpler */ *prefix_p = NULL; *method_p = METHOD_NONE; - *status = -1; if ((req_sz = headersEnd(conn->in.buf, conn->in.notYetUsed)) == 0) { debug(33, 5) ("Incomplete request, waiting for end of headers\n"); - *status = 0; return NULL; } assert(req_sz <= conn->in.notYetUsed); @@ -1101,7 +1103,6 @@ parseHttpRequest(ConnStateData * conn, method_t * method_p, int *status, header_sz = req_sz - (req_hdr - inbuf); if (0 == header_sz) { debug(33, 3) ("parseHttpRequest: header_sz == 0\n"); - *status = 0; xfree(inbuf); return NULL; } @@ -1151,7 +1152,7 @@ parseHttpRequest(ConnStateData * conn, method_t * method_p, int *status, setLogUri(http, http->uri); debug(33, 5) ("parseHttpRequest: Complete request received\n"); xfree(inbuf); - *status = 1; + result->flags.parsed_ok = 1; return result; } @@ -1202,8 +1203,12 @@ connGetConcurrentRequestCount(ConnStateData * conn) } int -connReadWasError(ConnStateData * conn, int size) +connReadWasError(ConnStateData * conn, comm_err_t flag, int size) { + if (flag != COMM_OK) { + debug(50, 2) ("connReadWasError: FD %d: got flag %d\n", conn->fd, flag); + return 1; + } if (size < 0) { if (!ignoreErrno(errno)) { debug(50, 2) ("connReadWasError: FD %d: %s\n", conn->fd, xstrerror()); @@ -1272,24 +1277,198 @@ connCancelIncompleteRequests(ConnStateData * conn) } static void -clientReadRequest(int fd, void *data) +clientMaybeReadData(ConnStateData *conn, int do_next_read) { - ConnStateData *conn = (ConnStateData *)data; - int parser_return_code = 0; + if (do_next_read) { + conn->flags.readMoreRequests = 1; + clientReadSomeData(conn); + } +} + +static void +clientAfterReadingRequests(int fd, ConnStateData *conn, int do_next_read) +{ + fde *F = &fd_table[fd]; + + /* Check if a half-closed connection was aborted in the middle */ + if (F->flags.socket_eof) { + if (conn->in.notYetUsed != conn->body.size_left) { /* != 0 when no +request body */ + /* Partial request received. Abort client connection! */ + debug(33, 3) ("clientReadRequest: FD %d aborted, partial request\n",+ fd); + comm_close(fd); + return; + } + } + + clientMaybeReadData (conn, do_next_read); +} + + +static void +clientProcessRequest(ConnStateData *conn, clientSocketContext *context, method_t method, char *prefix, size_t req_line_sz) +{ + clientHttpRequest *http = context->http; request_t *request = NULL; - int size; + /* We have an initial client stream in place should it be needed */ + /* setup our private context */ + connNoteUseOfBuffer(conn, http->req_sz); + + connAddContextToQueue(conn, context); + + if (context->flags.parsed_ok == 0) { + clientStreamNode *node = getClientReplyContext(context); + debug(33, 1) ("clientReadRequest: Invalid Request\n"); + clientSetReplyToError(node->data, + ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL, + &conn->peer.sin_addr, NULL, conn->in.buf, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + + if ((request = urlParse(method, http->uri)) == NULL) { + clientStreamNode *node = getClientReplyContext(context); + debug(33, 5) ("Invalid URL: %s\n", http->uri); + clientSetReplyToError(node->data, + ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri, + &conn->peer.sin_addr, NULL, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } else { + + /* compile headers */ + /* we should skip request line! */ + if (!httpRequestParseHeader(request, prefix + req_line_sz)) + debug(33, 1) ("Failed to parse request headers: %s\n%s\n", + http->uri, prefix); + /* continue anyway? */ + } + + request->flags.accelerated = http->flags.accel; + if (!http->flags.internal) { + if (internalCheck(strBuf(request->urlpath))) { + if (internalHostnameIs(request->host) && + request->port == getMyPort()) { + http->flags.internal = 1; + } else if (internalStaticCheck(strBuf(request->urlpath))) { + xstrncpy(request->host, internalHostname(), + SQUIDHOSTNAMELEN); + request->port = getMyPort(); + http->flags.internal = 1; + } + } + } + + /* + * cache the Content-length value in request_t. + */ + request->content_length = httpHeaderGetInt(&request->header, + HDR_CONTENT_LENGTH); + request->flags.internal = http->flags.internal; + setLogUri (http, urlCanonicalClean(request)); + request->client_addr = conn->peer.sin_addr; + request->my_addr = conn->me.sin_addr; + request->my_port = ntohs(conn->me.sin_port); + request->http_ver = http->http_ver; + if (!urlCheckRequest(request) || + httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) { + clientStreamNode *node = getClientReplyContext(context); + clientSetReplyToError(node->data, ERR_UNSUP_REQ, + HTTP_NOT_IMPLEMENTED, request->method, NULL, + &conn->peer.sin_addr, request, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + + + if (!clientIsContentLengthValid(request)) { + clientStreamNode *node = getClientReplyContext(context); + clientSetReplyToError(node->data, ERR_INVALID_REQ, + HTTP_LENGTH_REQUIRED, request->method, NULL, + &conn->peer.sin_addr, request, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + + http->request = requestLink(request); + clientSetKeepaliveFlag(http); + /* Do we expect a request-body? */ + if (request->content_length > 0) { + conn->body.size_left = request->content_length; + request->body_connection = conn; + /* Is it too large? */ + if (!clientIsRequestBodyValid(request->content_length) || + clientIsRequestBodyTooLargeForPolicy(request->content_length)) { + clientStreamNode *node = getClientReplyContext(context); + clientSetReplyToError(node->data, ERR_TOO_BIG, + HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL, + &conn->peer.sin_addr, http->request, NULL, NULL); + assert(context->http->out.offset == 0); + clientPullData(context); + conn->flags.readMoreRequests = 0; + return; + } + } + + /* If this is a CONNECT, don't schedule a read - ssl.c will handle it */ + if (http->request->method == METHOD_CONNECT) + context->flags.mayUseConnection = 1; + clientAccessCheck(http); +} + +static void +connStripBufferWhitespace (ConnStateData *conn) +{ + while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { + xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); + --conn->in.notYetUsed; + } +} + +static int +connOkToAddRequest(ConnStateData *conn) +{ + int result = connGetConcurrentRequestCount(conn) < (Config.onoff.pipeline_prefetch ? 2 : 1); + if (!result) { + debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", + conn->fd); + debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", + conn->fd); + } + return result; +} + +static void +connSetDefer (ConnStateData *conn, size_t milliSeconds) +{ + conn->defer.until = squid_curtime + milliSeconds; + conn->defer.n++; +} + +static void +clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, +void *data) +{ + ConnStateData *conn = (ConnStateData *)data; method_t method; char *prefix = NULL; - fde *F = &fd_table[fd]; - int len; clientSocketContext *context; - debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd); - connMakeSpaceAvailable(conn); - len = connGetAvailableBufferLength(conn) - 1; - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0); - statCounter.syscalls.sock.reads++; - /* TODO: read should callback */ - size = FD_READ_METHOD(fd, conn->in.buf + conn->in.notYetUsed, len); + int do_next_read = 1; /* the default _is_ to read data! - adrian */ + + assert (fd == conn->fd); + + /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */ + if (flag == COMM_ERR_CLOSING) { + return; + } /* * Don't reset the timeout value here. The timeout value will be * set to Config.Timeout.request by httpAccept() and @@ -1297,31 +1476,36 @@ clientReadRequest(int fd, void *data) * whole, not individual read() calls. Plus, it breaks our * lame half-close detection */ - if (size > 0) { - fd_bytes(fd, size, FD_READ); - kb_incr(&statCounter.client_http.kbytes_in, size); - conn->in.notYetUsed += size; - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ - } else if (size == 0) { - if (connFinishedWithConn(conn, size)) { - comm_close(fd); - return; - } - /* It might be half-closed, we can't tell */ - debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); - F->flags.socket_eof = 1; - conn->defer.until = squid_curtime + 1; - conn->defer.n++; - fd_note(fd, "half-closed"); - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - /* Continue to process previously read data */ - } else if (connReadWasError(conn, size)) { + if (connReadWasError(conn, flag, size)) { comm_close(fd); return; } + + if (flag == COMM_OK) { + if (size > 0) { + kb_incr(&statCounter.client_http.kbytes_in, size); + conn->in.notYetUsed += size; + conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string +*/ + } else if (size == 0) { + debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd); + if (connFinishedWithConn(conn, size)) { + comm_close(fd); + return; + } + /* It might be half-closed, we can't tell */ + fd_table[fd].flags.socket_eof = 1; + connSetDefer (conn, 1); + fd_note(fd, "half-closed"); + /* There is one more close check at the end, to detect aborted + * (partial) requests. At this point we can't tell if the request + * is partial. + */ + /* Continue to process previously read data */ + } + } + + /* Process request body if any */ if (conn->in.notYetUsed > 0 && conn->body.callback != NULL) clientProcessBody(conn); @@ -1331,151 +1515,50 @@ clientReadRequest(int fd, void *data) while (conn->in.notYetUsed > 0 && conn->body.size_left == 0) { size_t req_line_sz; - /* Skip leading ( or trail from previous request) whitespace */ - while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) { - xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1); - --conn->in.notYetUsed; + connStripBufferWhitespace (conn); + if (conn->in.notYetUsed == 0) { + clientAfterReadingRequests(fd, conn, do_next_read); + return; } - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ - if (conn->in.notYetUsed == 0) - break; /* Limit the number of concurrent requests to 2 */ - if (connGetConcurrentRequestCount(conn) >= (Config.onoff.pipeline_prefetch ? 2 : 1)) { - debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", - fd); - debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", - fd); - conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */ - conn->defer.n++; + if (!connOkToAddRequest(conn)) { + /* Reset when a request is complete */ + connSetDefer (conn, 100); + clientMaybeReadData (conn, do_next_read); return; } - conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */ + /* Should not be needed anymore */ + /* Terminate the string */ + conn->in.buf[conn->in.notYetUsed] = '\0'; /* Process request */ context = parseHttpRequest(conn, - &method, &parser_return_code, &prefix, &req_line_sz); - if (!context) + &method, &prefix, &req_line_sz); + /* partial or incomplete request */ + if (!context) { safe_free(prefix); - if (context) { - clientHttpRequest *http = context->http; - /* We have an initial client stream in place should it be needed */ - /* setup our private context */ - assert (http->req_sz >= 0); - connNoteUseOfBuffer(conn, http->req_sz); + if (!connKeepReadingIncompleteRequest(conn)) + connCancelIncompleteRequests(conn); + break; /* conn->in.notYetUsed > 0 && conn->body.size_left == 0 */ + } - connAddContextToQueue(conn, context); + /* status -1 or 1 */ + if (context) { commSetTimeout(fd, Config.Timeout.lifetime, clientLifetimeTimeout, - http); - if (parser_return_code < 0) { - clientStreamNode *node = getClientReplyContext(context); - debug(33, 1) ("clientReadRequest: FD %d Invalid Request\n", fd); - clientSetReplyToError(node->data, - ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL, - &conn->peer.sin_addr, NULL, conn->in.buf, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - safe_free(prefix); - break; - } - if ((request = urlParse(method, http->uri)) == NULL) { - clientStreamNode *node = getClientReplyContext(context); - debug(33, 5) ("Invalid URL: %s\n", http->uri); - clientSetReplyToError(node->data, - ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri, - &conn->peer.sin_addr, NULL, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - safe_free(prefix); - break; - } else { - /* compile headers */ - /* we should skip request line! */ - if (!httpRequestParseHeader(request, prefix + req_line_sz)) - debug(33, 1) ("Failed to parse request headers: %s\n%s\n", - http->uri, prefix); - /* continue anyway? */ - } - request->flags.accelerated = http->flags.accel; - if (!http->flags.internal) { - if (internalCheck(strBuf(request->urlpath))) { - if (internalHostnameIs(request->host) && - request->port == getMyPort()) { - http->flags.internal = 1; - } else if (internalStaticCheck(strBuf(request->urlpath))) { - xstrncpy(request->host, internalHostname(), - SQUIDHOSTNAMELEN); - request->port = getMyPort(); - http->flags.internal = 1; - } - } - } - /* - * cache the Content-length value in request_t. - */ - request->content_length = httpHeaderGetInt(&request->header, - HDR_CONTENT_LENGTH); - request->flags.internal = http->flags.internal; + context->http); + + clientProcessRequest(conn, context, method, prefix, req_line_sz); + safe_free(prefix); - safe_free(http->log_uri); - http->log_uri = xstrdup(urlCanonicalClean(request)); - request->client_addr = conn->peer.sin_addr; - request->my_addr = conn->me.sin_addr; - request->my_port = ntohs(conn->me.sin_port); - request->http_ver = http->http_ver; - if (!urlCheckRequest(request) || - httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) { - clientStreamNode *node = getClientReplyContext(context); - clientSetReplyToError(node->data, ERR_UNSUP_REQ, - HTTP_NOT_IMPLEMENTED, request->method, NULL, - &conn->peer.sin_addr, request, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - break; - } - if (!clientIsContentLengthValid(request)) { - clientStreamNode *node = getClientReplyContext(context); - clientSetReplyToError(node->data, ERR_INVALID_REQ, - HTTP_LENGTH_REQUIRED, request->method, NULL, - &conn->peer.sin_addr, request, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); + if (!conn->flags.readMoreRequests) { + conn->flags.readMoreRequests = 1; break; } - http->request = requestLink(request); - clientSetKeepaliveFlag(http); - /* Do we expect a request-body? */ - if (request->content_length > 0) { - conn->body.size_left = request->content_length; - request->body_connection = conn; - /* Is it too large? */ - if (!clientIsRequestBodyValid(request->content_length) || - clientIsRequestBodyTooLargeForPolicy(request->content_length)) { - clientStreamNode *node = getClientReplyContext(context); - clientSetReplyToError(node->data, ERR_TOO_BIG, - HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL, - &conn->peer.sin_addr, http->request, NULL, NULL); - assert(context->http->out.offset == 0); - clientPullData(context); - break; - } - } - clientAccessCheck(http); + if (context->flags.mayUseConnection) + do_next_read = 0; continue; /* while offset > 0 && body.size_left == 0 */ - } else if (parser_return_code == 0) { - if (!connKeepReadingIncompleteRequest(conn)) - connCancelIncompleteRequests(conn); - break; } } /* while offset > 0 && conn->body.size_left == 0 */ - /* Check if a half-closed connection was aborted in the middle */ - if (F->flags.socket_eof) { - if (conn->in.notYetUsed != conn->body.size_left) { /* != 0 when no request body */ - /* Partial request received. Abort client connection! */ - debug(33, 3) ("clientReadRequest: FD %d aborted, partial request\n", - fd); - comm_close(fd); - return; - } - } + clientAfterReadingRequests(fd, conn, do_next_read); } /* file_read like function, for reading body content */ @@ -1685,60 +1768,59 @@ httpAcceptDefer(int fdunused, void *dataunused) } ConnStateData * -connStateCreate(struct sockaddr_in peer, struct sockaddr_in me, int fd) +connStateCreate(struct sockaddr_in *peer, struct sockaddr_in *me, int fd) { ConnStateData *result = cbdataAlloc(ConnStateData); - result->peer = peer; - result->log_addr = peer.sin_addr; + result->peer = *peer; + result->log_addr = peer->sin_addr; result->log_addr.s_addr &= Config.Addrs.client_netmask.s_addr; - result->me = me; + result->me = *me; result->fd = fd; result->in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &result->in.allocatedSize); + result->flags.readMoreRequests = 1; return result; } /* Handle a new connection on HTTP socket. */ void -httpAccept(int sock, void *data) +httpAccept(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *peer, + comm_err_t flag, int xerrno, void *data) { int *N = &incoming_sockets_accepted; - int fd = -1; ConnStateData *connState = NULL; - struct sockaddr_in peer; - struct sockaddr_in me; - int max = INCOMING_HTTP_MAX; #if USE_IDENT static aclCheck_t identChecklist; #endif - commSetSelect(sock, COMM_SELECT_READ, httpAccept, NULL, 0); - while (max-- && !httpAcceptDefer(sock, NULL)) { - memset(&peer, '\0', sizeof(struct sockaddr_in)); - memset(&me, '\0', sizeof(struct sockaddr_in)); - if ((fd = comm_accept(sock, &peer, &me)) < 0) { - if (!ignoreErrno(errno)) - debug(50, 1) ("httpAccept: FD %d: accept failure: %s\n", + /* kick off another one for later */ + comm_accept(sock, httpAccept, NULL); + + /* XXX we're not considering httpAcceptDefer yet! */ + + if (flag != COMM_OK) { + errno = xerrno; + debug(50, 1) ("httpAccept: FD %d: accept failure: %s\n", sock, xstrerror()); - break; + return; } - debug(33, 4) ("httpAccept: FD %d: accepted\n", fd); - connState = connStateCreate(peer, me, fd); - comm_add_close_handler(fd, connStateFree, connState); + + debug(33, 4) ("httpAccept: FD %d: accepted\n", newfd); + connState = connStateCreate(peer, me, newfd); + comm_add_close_handler(newfd, connStateFree, connState); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(peer.sin_addr, FQDN_LOOKUP_IF_MISS); - commSetTimeout(fd, Config.Timeout.request, requestTimeout, connState); + fqdncache_gethostbyaddr(peer->sin_addr, FQDN_LOOKUP_IF_MISS); + commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState); #if USE_IDENT - identChecklist.src_addr = peer.sin_addr; - identChecklist.my_addr = me.sin_addr; - identChecklist.my_port = ntohs(me.sin_port); + identChecklist.src_addr = peer->sin_addr; + identChecklist.my_addr = me->sin_addr; + identChecklist.my_port = ntohs(me->sin_port); if (aclCheckFast(Config.accessList.identLookup, &identChecklist)) - identStart(&me, &peer, clientIdentDone, connState); + identStart(me, peer, clientIdentDone, connState); #endif - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, connState, 0); - commSetDefer(fd, clientReadDefer, connState); - clientdbEstablished(peer.sin_addr, 1); + clientReadSomeData(connState); + commSetDefer(newfd, clientReadDefer, connState); + clientdbEstablished(peer->sin_addr, 1); assert(N); (*N)++; - } } #if USE_SSL @@ -1781,7 +1863,7 @@ clientNegotiateSSL(int fd, void *data) debug(83, 5) ("clientNegotiateSSL: FD %d has no certificate.\n", fd); } - commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0); + clientReadSomeData(conn); } struct _https_port_data { @@ -1792,61 +1874,56 @@ CBDATA_TYPE(https_port_data); /* handle a new HTTPS connection */ static void -httpsAccept(int sock, void *data) +httpsAccept(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *peer, + comm_err_t flag, int xerrno, void *data) { int *N = &incoming_sockets_accepted; https_port_data *https_port = (https_port_data *)data; SSL_CTX *sslContext = https_port->sslContext; - int fd = -1; ConnStateData *connState = NULL; - struct sockaddr_in peer; - struct sockaddr_in me; - int max = INCOMING_HTTP_MAX; SSL *ssl; int ssl_error; #if USE_IDENT static aclCheck_t identChecklist; #endif - commSetSelect(sock, COMM_SELECT_READ, httpsAccept, https_port, 0); - while (max-- && !httpAcceptDefer(sock, NULL)) { - memset(&peer, '\0', sizeof(struct sockaddr_in)); - memset(&me, '\0', sizeof(struct sockaddr_in)); - if ((fd = comm_accept(sock, &peer, &me)) < 0) { - if (!ignoreErrno(errno)) - debug(50, 1) ("httpsAccept: FD %d: accept failure: %s\n", - sock, xstrerror()); - break; - } - if ((ssl = SSL_new(sslContext)) == NULL) { - ssl_error = ERR_get_error(); - debug(83, 1) ("httpsAccept: Error allocating handle: %s\n", - ERR_error_string(ssl_error, NULL)); - break; - } - SSL_set_fd(ssl, fd); - fd_table[fd].ssl = ssl; - fd_table[fd].read_method = &ssl_read_method; - fd_table[fd].write_method = &ssl_write_method; - debug(50, 5) ("httpsAccept: FD %d accepted, starting SSL negotiation.\n", fd); - - connState = connStateCreate(peer, me, fd); - /* XXX account connState->in.buf */ - comm_add_close_handler(fd, connStateFree, connState); - if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(peer.sin_addr, FQDN_LOOKUP_IF_MISS); - commSetTimeout(fd, Config.Timeout.request, requestTimeout, connState); + + if (flag != COMM_OK) { + errno = xerrno; + debug(50, 1) ("httpsAccept: FD %d: accept failure: %s\n", + sock, xstrerror()); + return; + } + + if ((ssl = SSL_new(sslContext)) == NULL) { + ssl_error = ERR_get_error(); + debug(83, 1) ("httpsAccept: Error allocating handle: %s\n", + ERR_error_string(ssl_error, NULL)); + return; + } + + SSL_set_fd(ssl, newfd); + fd_table[newfd].ssl = ssl; + fd_table[newfd].read_method = &ssl_read_method; + fd_table[newfd].write_method = &ssl_write_method; + debug(50, 5) ("httpsAccept: FD %d accepted, starting SSL negotiation.\n", newfd); + + connState = connStateCreate(peer, me, newfd); + /* XXX account connState->in.buf */ + comm_add_close_handler(newfd, connStateFree, connState); + if (Config.onoff.log_fqdn) + fqdncache_gethostbyaddr(peer->sin_addr, FQDN_LOOKUP_IF_MISS); + commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState); #if USE_IDENT - identChecklist.src_addr = peer.sin_addr; - identChecklist.my_addr = me.sin_addr; - identChecklist.my_port = ntohs(me.sin_port); - if (aclCheckFast(Config.accessList.identLookup, &identChecklist)) - identStart(&me, &peer, clientIdentDone, connState); + identChecklist.src_addr = peer->sin_addr; + identChecklist.my_addr = me->sin_addr; + identChecklist.my_port = ntohs(me->sin_port); + if (aclCheckFast(Config.accessList.identLookup, &identChecklist)) + identStart(me, peer, clientIdentDone, connState); #endif - commSetSelect(fd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); - commSetDefer(fd, clientReadDefer, connState); - clientdbEstablished(peer.sin_addr, 1); - (*N)++; - } + commSetSelect(newfd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); + commSetDefer(newfd, clientReadDefer, connState); + clientdbEstablished(peer->sin_addr, 1); + (*N)++; } #endif /* USE_SSL */ @@ -1915,7 +1992,7 @@ clientHttpConnectionsOpen(void) if (fd < 0) continue; comm_listen(fd); - commSetSelect(fd, COMM_SELECT_READ, httpAccept, NULL, 0); + comm_accept(fd, httpAccept, NULL); /* * We need to set a defer handler here so that we don't * peg the CPU with select() when we hit the FD limit. @@ -1954,7 +2031,7 @@ clientHttpsConnectionsOpen(void) sslCreateContext(s->cert, s->key, s->version, s->cipher, s->options); comm_listen(fd); - commSetSelect(fd, COMM_SELECT_READ, httpsAccept, https_port, 0); + comm_accept(fd, httpsAccept, NULL); commSetDefer(fd, httpAcceptDefer, NULL); debug(1, 1) ("Accepting HTTPS connections at %s, port %d, FD %d.\n", inet_ntoa(s->s.sin_addr), (int) ntohs(s->s.sin_port), fd); diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc index 77bdc08d39..107242affa 100644 --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@ -1,6 +1,6 @@ /* - * $Id: client_side_reply.cc,v 1.16 2002/10/13 20:35:00 robertc Exp $ + * $Id: client_side_reply.cc,v 1.17 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 88 Client-side Reply Routines * AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c) @@ -526,7 +526,7 @@ clientCacheHit(void *data, StoreIOBuffer result) StoreEntry *e = http->entry; MemObject *mem; request_t *r = http->request; - debug(88, 3) ("clientCacheHit: %s, %ud bytes\n", http->uri, result.length); + debug(88, 3) ("clientCacheHit: %s, %ud bytes\n", http->uri, (unsigned int)result.length); if (http->entry == NULL) { debug(88, 3) ("clientCacheHit: request aborted\n"); return; @@ -1566,7 +1566,7 @@ clientSendMoreData(void *data, StoreIOBuffer result) context->flags.storelogiccomplete = 1; debug(88, 5) ("clientSendMoreData: %s, %d bytes (%u new bytes)\n", - http->uri, (int) size, result.length); + http->uri, (int) size, (unsigned int)result.length); assert(size <= HTTP_REQBUF_SZ || context->flags.headersSent); assert(http->request != NULL); /* ESI TODO: remove this assert once everything is stable */ diff --git a/src/comm.cc b/src/comm.cc index 17557f0583..6095a6812c 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -1,6 +1,6 @@ /* - * $Id: comm.cc,v 1.336 2002/10/13 20:35:00 robertc Exp $ + * $Id: comm.cc,v 1.337 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 5 Socket Functions * AUTHOR: Harvest Derived @@ -34,6 +34,8 @@ */ #include "squid.h" +#include "StoreIOBuffer.h" +#include "comm.h" #if defined(_SQUID_CYGWIN_) #include @@ -74,8 +76,480 @@ static int commResetFD(ConnectStateData * cs); static int commRetryConnect(ConnectStateData * cs); CBDATA_TYPE(ConnectStateData); + +struct _fdc_t { + int active; + dlink_list CommCallbackList; + struct { + char *buf; + int size; + IOCB *handler; + void *handler_data; + } read; + struct { + struct sockaddr_in me; + struct sockaddr_in pn; + IOACB *handler; + void *handler_data; + } accept; + struct CommFiller { + StoreIOBuffer requestedData; + size_t amountDone; + IOFCB *handler; + void *handler_data; + } fill; + +}; +typedef struct _fdc_t fdc_t; + +typedef enum { + COMM_CB_READ = 1, + COMM_CB_WRITE, + COMM_CB_ACCEPT, + COMM_CB_FILL +} comm_callback_t; + +struct _CommCallbackData { + comm_callback_t type; + dlink_node fd_node; + dlink_node h_node; + int fd; + int newfd; /* for accept() */ + char *buf; + int retval; + union { + IOCB *r_callback; + IOACB *a_callback; + IOFCB *f_callback; + } c; + void *callback_data; + comm_err_t errcode; + int xerrno; + int seqnum; + struct sockaddr_in me; + struct sockaddr_in pn; + StoreIOBuffer sb; +}; +typedef struct _CommCallbackData CommCallbackData; + +struct _fd_debug_t { + char *close_file; + int close_line; +}; +typedef struct _fd_debug_t fd_debug_t; + static MemPool *comm_write_pool = NULL; static MemPool *conn_close_pool = NULL; +static MemPool *comm_callback_pool = NULL; +fdc_t *fdc_table = NULL; +fd_debug_t *fdd_table = NULL; +dlink_list CommCallbackList; +static int CommCallbackSeqnum = 1; + + +/* New and improved stuff */ + +/* + * return whether there are entries in the callback queue + */ +int +comm_existsiocallback(void) +{ + return CommCallbackList.head == NULL; +} + +/* + * add an IO callback + * + * IO callbacks are added when we want to notify someone that some IO + * has finished but we don't want to risk re-entering a non-reentrant + * code block. + */ +static void +comm_addreadcallback(int fd, IOCB *callback, char *buf, size_t retval, comm_err_t errcode, + int xerrno, void *callback_data) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->retval = retval; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->c.r_callback = callback; + cio->callback_data = callback_data; + cio->seqnum = CommCallbackSeqnum; + cio->buf = buf; + cio->type = COMM_CB_READ; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); + +} + + +static void +comm_addacceptcallback(int fd, int newfd, IOACB *callback, struct sockaddr_in *pn, + struct sockaddr_in *me, comm_err_t errcode, int xerrno, void *callback_data) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->c.a_callback = callback; + cio->callback_data = callback_data; + cio->seqnum = CommCallbackSeqnum; + cio->type = COMM_CB_ACCEPT; + cio->newfd = newfd; + cio->pn = *pn; + cio->me = *me; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); + +} + +static void +comm_add_fill_callback(int fd, size_t retval, comm_err_t errcode, int xerrno) +{ + CommCallbackData *cio; + + assert(fdc_table[fd].active == 1); + + /* Allocate a new struct */ + cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool); + + /* Throw our data into it */ + cio->fd = fd; + cio->xerrno = xerrno; + cio->errcode = errcode; + cio->c.f_callback = fdc_table[fd].fill.handler; + cio->callback_data = fdc_table[fd].fill.handler_data; + cio->seqnum = CommCallbackSeqnum; + cio->type = COMM_CB_FILL; + /* retval not used */ + cio->retval = -1; + cio->sb = fdc_table[fd].fill.requestedData; + cio->sb.length = retval; + /* Clear out fd state */ + fdc_table[fd].fill.handler = NULL; + fdc_table[fd].fill.handler_data = NULL; + + /* Add it to the end of the list */ + dlinkAddTail(cio, &(cio->h_node), &CommCallbackList); + + /* and add it to the end of the fd list */ + dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList)); +} + + + + +static void +comm_call_io_callback(CommCallbackData *cio) +{ + switch(cio->type) { + case COMM_CB_READ: + cio->c.r_callback(cio->fd, cio->buf, cio->retval, cio->errcode, cio->xerrno, + cio->callback_data); + break; + case COMM_CB_WRITE: + fatal("write comm hasn't been implemented yet!"); + break; + case COMM_CB_ACCEPT: + cio->c.a_callback(cio->fd, cio->newfd, &cio->me, &cio->pn, cio->errcode, + cio->xerrno, cio->callback_data); + break; + case COMM_CB_FILL: + cio->c.f_callback(cio->fd, cio->sb, cio->errcode, + cio->xerrno, cio->callback_data); + break; + default: + fatal("unknown comm io callback type!"); + break; + }; +} + + +/* + * call the IO callbacks + * + * This should be called before comm_select() so code can attempt to + * initiate some IO. + * + * When io callbacks are added, they are added with the current + * sequence number. The sequence number is incremented in this routine - + * since callbacks are added to the _tail_ of the list, when we hit a + * callback with a seqnum _not_ what it was when we entered this routine, + * we can stop. + */ +void +comm_calliocallback(void) +{ + CommCallbackData *cio; + dlink_node *node; + int oldseqnum = CommCallbackSeqnum; + + /* Call our callbacks until we hit NULL or the seqnum changes */ + while (CommCallbackList.head != NULL) { + node = (dlink_node *)CommCallbackList.head; + cio = (CommCallbackData *)node->data; + + /* If seqnum isn't the same, its time to die */ + if (cio->seqnum != oldseqnum) + break; /* we've hit newly-added events */ + + assert(fdc_table[cio->fd].active == 1); + + dlinkDelete(&cio->h_node, &CommCallbackList); + dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList)); + comm_call_io_callback(cio); + memPoolFree(comm_callback_pool, cio); + } +} + + +/* + * Queue a callback + */ +static void +comm_read_callback(int fd, int retval, comm_err_t errcode, int xerrno) +{ + fdc_t *Fc = &fdc_table[fd]; + + assert(Fc->read.handler != NULL); + + comm_addreadcallback(fd, Fc->read.handler, Fc->read.buf, retval, errcode, xerrno, + Fc->read.handler_data); + Fc->read.handler = NULL; + Fc->read.handler_data = NULL; +} + +/* + * Attempt a read + * + * If the read attempt succeeds or fails, call the callback. + * Else, wait for another IO notification. + */ +static void +comm_read_try(int fd, void *data) +{ + fdc_t *Fc = &fdc_table[fd]; + int retval; + + /* make sure we actually have a callback */ + assert(Fc->read.handler != NULL); + + /* Attempt a read */ + statCounter.syscalls.sock.reads++; + retval = FD_READ_METHOD(fd, Fc->read.buf, Fc->read.size); + if (retval < 0 && !ignoreErrno(errno)) { + comm_read_callback(fd, -1, COMM_ERROR, errno); + return; + }; + + /* See if we read anything */ + /* Note - read 0 == socket EOF, which is a valid read */ + if (retval >= 0) { + fd_bytes(fd, retval, FD_READ); + comm_read_callback(fd, retval, COMM_OK, 0); + return; + } + + /* Nope, register for some more IO */ + commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0); +} + +/* + * Queue a read. handler/handler_data are called when the read + * completes, on error, or on file descriptor close. + */ +void +comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data) +{ + /* Make sure we're not reading anything and we're not closing */ + assert(fdc_table[fd].active == 1); + assert(fdc_table[fd].read.handler == NULL); + assert(!fd_table[fd].flags.closing); + + /* Queue a read */ + fdc_table[fd].read.buf = buf; + fdc_table[fd].read.size = size; + fdc_table[fd].read.handler = handler; + fdc_table[fd].read.handler_data = handler_data; + +#if OPTIMISTIC_IO + comm_read_try(fd, NULL); +#else + /* Register intrest in a FD read */ + commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0); +#endif +} + +static void +comm_fill_read(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +{ + /* TODO use a reference to the table entry, or use C++ :] */ + StoreIOBuffer *sb; + _fdc_t::CommFiller *fill; + assert(fdc_table[fd].active == 1); + + if (flag != COMM_OK) { + /* Error! */ + /* XXX This was -1 below, but -1 can't be used for size_t parameters. + * The callback should set -1 to the client if needed based on the flags + */ + comm_add_fill_callback(fd, 0, flag, xerrno); + return; + } + /* flag is COMM_OK */ + /* We handle EOFs as read lengths of 0! Its eww, but its consistent */ + fill = &fdc_table[fd].fill; + fill->amountDone += len; + sb = &fdc_table[fd].fill.requestedData; + assert(fill->amountDone <= sb->length); + comm_add_fill_callback(fd, fill->amountDone, COMM_OK, 0); +} + +/* + * Try filling a StoreIOBuffer with some data, and call a callback when successful + */ +void +comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data) +{ + assert(fdc_table[fd].fill.handler == NULL); + /* prevent confusion */ + assert (sb.offset == 0); + + /* If we don't have any data, record details and schedule a read */ + fdc_table[fd].fill.handler = callback; + fdc_table[fd].fill.handler_data = data; + fdc_table[fd].fill.requestedData = sb; + fdc_table[fd].fill.amountDone = 0; + + comm_read(fd, sb.data, sb.length, comm_fill_read, NULL); +} + + +/* + * Empty the read buffers + * + * This is a magical routine that empties the read buffers. + * Under some platforms (Linux) if a buffer has data in it before + * you call close(), the socket will hang and take quite a while + * to timeout. + */ +static void +comm_empty_os_read_buffers(int fd) +{ +#ifdef _SQUID_LINUX_ + /* prevent those nasty RST packets */ + char buf[SQUID_TCP_SO_RCVBUF]; + if (fd_table[fd].flags.nonblocking == 1) + while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0); +#endif +} + + +/* + * Return whether a file descriptor has any pending read request callbacks + * + * Assumptions: the fd is open (ie, its not closing) + */ +int +comm_has_pending_read_callback(int fd) +{ + dlink_node *node; + CommCallbackData *cd; + + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + /* + * XXX I don't like having to walk the list! + * Instead, if this routine is called often enough, we should + * also maintain a linked list of _read_ events - we can just + * check if the list head a HEAD.. + * - adrian + */ + node = fdc_table[fd].CommCallbackList.head; + while (node != NULL) { + cd = (CommCallbackData *)node->data; + if (cd->type == COMM_CB_READ) + return 1; + node = node->next; + } + + /* Not found */ + return 0; +} + +/* + * return whether a file descriptor has a read handler + * + * Assumptions: the fd is open + */ +int +comm_has_pending_read(int fd) +{ + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + return (fdc_table[fd].read.handler != NULL); +} + +/* + * Cancel a pending read. Assert that we have the right parameters, + * and that there are no pending read events! + */ +void +comm_read_cancel(int fd, IOCB *callback, void *data) +{ + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + assert(fdc_table[fd].read.handler == callback); + assert(fdc_table[fd].read.handler_data == data); + + assert(!comm_has_pending_read_callback(fd)); + + /* Ok, we can be reasonably sure we won't lose any data here! */ + + /* Delete the callback */ + fdc_table[fd].read.handler = NULL; + fdc_table[fd].read.handler_data = NULL; +} + + +void +fdc_open(int fd, unsigned int type, char *desc) +{ + assert(fdc_table[fd].active == 0); + + fdc_table[fd].active = 1; + fd_open(fd, type, desc); +} + + +/* Older stuff */ static void CommWriteStateCallbackAndFree(int fd, comm_err_t code) @@ -207,6 +681,10 @@ comm_openex(int sock_type, /* update fdstat */ debug(5, 5) ("comm_open: FD %d is a new socket\n", new_socket); fd_open(new_socket, FD_SOCKET, note); + fdd_table[new_socket].close_file = NULL; + fdd_table[new_socket].close_line = 0; + assert(fdc_table[new_socket].active == 0); + fdc_table[new_socket].active = 1; F = &fd_table[new_socket]; F->local_addr = addr; F->tos = tos; @@ -243,26 +721,6 @@ comm_openex(int sock_type, return new_socket; } -/* - * NOTE: set the listen queue to Squid_MaxFD/4 and rely on the kernel to - * impose an upper limit. Solaris' listen(3n) page says it has - * no limit on this parameter, but sys/socket.h sets SOMAXCONN - * to 5. HP-UX currently has a limit of 20. SunOS is 5 and - * OSF 3.0 is 8. - */ -int -comm_listen(int sock) -{ - int x; - if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) { - debug(50, 0) ("comm_listen: listen(%d, %d): %s\n", - Squid_MaxFD >> 2, - sock, xstrerror()); - return x; - } - return sock; -} - void commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data) { @@ -526,7 +984,7 @@ comm_connect_addr(int sock, const struct sockaddr_in *address) /* Wait for an incoming connection on FD. FD should be a socket returned * from comm_listen. */ int -comm_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me) +comm_old_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me) { int sock; struct sockaddr_in P; @@ -539,13 +997,13 @@ comm_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me) if ((sock = accept(fd, (struct sockaddr *) &P, &Slen)) < 0) { PROF_stop(comm_accept); if (ignoreErrno(errno)) { - debug(50, 5) ("comm_accept: FD %d: %s\n", fd, xstrerror()); + debug(50, 5) ("comm_old_accept: FD %d: %s\n", fd, xstrerror()); return COMM_NOMESSAGE; } else if (ENFILE == errno || EMFILE == errno) { - debug(50, 3) ("comm_accept: FD %d: %s\n", fd, xstrerror()); + debug(50, 3) ("comm_old_accept: FD %d: %s\n", fd, xstrerror()); return COMM_ERROR; } else { - debug(50, 1) ("comm_accept: FD %d: %s\n", fd, xstrerror()); + debug(50, 1) ("comm_old_accept: FD %d: %s\n", fd, xstrerror()); return COMM_ERROR; } } @@ -559,6 +1017,9 @@ comm_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me) commSetCloseOnExec(sock); /* fdstat update */ fd_open(sock, FD_SOCKET, "HTTP Request"); + fdd_table[sock].close_file = NULL; + fdd_table[sock].close_line = 0; + fdc_table[sock].active = 1; F = &fd_table[sock]; xstrncpy(F->ipaddr, inet_ntoa(P.sin_addr), 16); F->remote_port = htons(P.sin_port); @@ -638,21 +1099,35 @@ comm_reset_close(int fd) comm_close(fd); } + +/* + * Close the socket fd. + * + * + call write handlers with ERR_CLOSING + * + call read handlers with ERR_CLOSING + * + call closing handlers + */ void -comm_close(int fd) +_comm_close(int fd, char *file, int line) { fde *F = NULL; + dlink_node *node; + CommCallbackData *cio; debug(5, 5) ("comm_close: FD %d\n", fd); assert(fd >= 0); assert(fd < Squid_MaxFD); F = &fd_table[fd]; + fdd_table[fd].close_file = file; + fdd_table[fd].close_line = line; if (F->flags.closing) return; if (shutting_down && (!F->flags.open || F->type == FD_FILE)) return; assert(F->flags.open); + /* The following fails because ipc.c is doing calls to pipe() to create sockets! */ + /* assert(fdc_table[fd].active == 1); */ assert(F->type != FD_FILE); PROF_start(comm_close); F->flags.closing = 1; @@ -662,6 +1137,19 @@ comm_close(int fd) #endif commSetTimeout(fd, -1, NULL, NULL); CommWriteStateCallbackAndFree(fd, COMM_ERR_CLOSING); + + /* Delete any pending io callbacks */ + while (fdc_table[fd].CommCallbackList.head != NULL) { + node = fdc_table[fd].CommCallbackList.head; + cio = (CommCallbackData *)node->data; + assert(fd == cio->fd); /* just paranoid */ + dlinkDelete(&cio->h_node, &CommCallbackList); + dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList)); + + comm_call_io_callback(cio); + memPoolFree(comm_callback_pool, cio); + } + commCallCloseHandlers(fd); if (F->uses) /* assume persistent connect count */ pconnHistCount(1, F->uses); @@ -671,8 +1159,11 @@ comm_close(int fd) F->ssl = NULL; } #endif + comm_empty_os_read_buffers(fd); fd_close(fd); /* update fdstat */ close(fd); + fdc_table[fd].active = 0; + bzero(&fdc_table[fd], sizeof(fdc_t)); statCounter.syscalls.sock.closes++; PROF_stop(comm_close); } @@ -784,7 +1275,7 @@ commSetNonBlocking(int fd) int nonblocking = TRUE; if (fd_table[fd].type != FD_PIPE) { if (ioctl(fd, FIONBIO, &nonblocking) < 0) { - debug(50, 0) ("commSetNonBlocking: FD %d: %s %u\n", fd, xstrerror(), fd_table[fd].type); + debug(50, 0) ("commSetNonBlocking: FD %d: %s %D\n", fd, xstrerror(), fd_table[fd].type); return COMM_ERROR; } } else { @@ -852,13 +1343,17 @@ commSetTcpNoDelay(int fd) void comm_init(void) { - fd_table = (fde *)xcalloc(Squid_MaxFD, sizeof(fde)); + fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde)); + fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t)); + fdc_table = (fdc_t *)xcalloc(Squid_MaxFD, sizeof(fdc_t)); /* XXX account fd_table */ /* Keep a few file descriptors free so that we don't run out of FD's * after accepting a client but before it opens a socket or a file. * Since Squid_MaxFD can be as high as several thousand, don't waste them */ RESERVED_FD = XMIN(100, Squid_MaxFD / 4); CBDATA_INIT_TYPE(ConnectStateData); + + comm_callback_pool = memPoolCreate("comm callbacks", sizeof(CommCallbackData)); comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData)); conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler)); } @@ -935,6 +1430,9 @@ void comm_write(int fd, const char *buf, int size, CWCB * handler, void *handler_data, FREE * free_func) { CommWriteStateData *state = fd_table[fd].rwstate; + + assert(!fd_table[fd].flags.closing); + debug(5, 5) ("comm_write: FD %d: sz %d: hndl %p: data %p.\n", fd, size, handler, handler_data); if (NULL != state) { @@ -959,6 +1457,7 @@ comm_write_mbuf(int fd, MemBuf mb, CWCB * handler, void *handler_data) comm_write(fd, mb.buf, mb.size, handler, handler_data, memBufFreeFunc(&mb)); } + /* * hm, this might be too general-purpose for all the places we'd * like to use it. @@ -1048,3 +1547,96 @@ commDeferRead(int fd) return 0; return F->defer_check(fd, F->defer_data); } + + +/* + * New-style listen and accept routines + * + * Listen simply registers our interest in an FD for listening, + * and accept takes a callback to call when an FD has been + * accept()ed. + */ +int +comm_listen(int sock) +{ + int x; + if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) { + debug(50, 0) ("comm_listen: listen(%d, %d): %s\n", + Squid_MaxFD >> 2, + sock, xstrerror()); + return x; + } + return sock; +} + + +/* + * This callback is called whenever a filedescriptor is ready + * to dupe itself and fob off an accept()ed connection + */ +static void +comm_accept_try(int fd, void *data) +{ + int newfd; + fdc_t *Fc; + + assert(fdc_table[fd].active == 1); + + Fc = &(fdc_table[fd]); + + /* Accept a new connection */ + newfd = comm_old_accept(fd, &Fc->accept.pn, &Fc->accept.me); + + if (newfd < 0) { + /* Issues - check them */ + if (newfd == COMM_NOMESSAGE) { + /* register interest again */ + commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); + return; + } + /* Problem! */ + comm_addacceptcallback(fd, -1, Fc->accept.handler, &Fc->accept.pn, &Fc->accept.me, COMM_ERROR, errno, Fc->accept.handler_data); + Fc->accept.handler = NULL; + Fc->accept.handler_data = NULL; + return; + } + + /* setup our new filedescriptor in fd_table */ + /* and set it up in fdc_table */ + + /* queue a completed callback with the new FD */ + comm_addacceptcallback(fd, newfd, Fc->accept.handler, &Fc->accept.pn, &Fc->accept.me, COMM_OK, 0, Fc->accept.handler_data); + Fc->accept.handler = NULL; + Fc->accept.handler_data = NULL; + +} + + +/* + * Notes: + * + the current interface will queue _one_ accept per io loop. + * this isn't very optimal and should be revisited at a later date. + */ +void +comm_accept(int fd, IOACB *handler, void *handler_data) +{ + fdc_t *Fc; + + assert(fd_table[fd].flags.open == 1); + assert(fdc_table[fd].active == 1); + + /* make sure we're not pending! */ + assert(fdc_table[fd].accept.handler == NULL); + + /* Record our details */ + Fc = &fdc_table[fd]; + Fc->accept.handler = handler; + Fc->accept.handler_data = handler_data; + + /* Kick off the accept */ +#if OPTIMISTIC_IO + comm_accept_try(fd, NULL); +#else + commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0); +#endif +} diff --git a/src/comm.h b/src/comm.h new file mode 100644 index 0000000000..92061c1649 --- /dev/null +++ b/src/comm.h @@ -0,0 +1,15 @@ +#ifndef __COMM_H__ +#define __COMM_H__ + +#include "StoreIOBuffer.h" + +typedef void IOFCB(int fd, StoreIOBuffer recievedData, comm_err_t flag, int xerrno, void *data); +/* fill sb with up to length data from fd */ +extern void comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data); + +extern int comm_has_pending_read_callback(int fd); +extern int comm_has_pending_read(int fd); +extern void comm_read_cancel(int fd, IOCB *callback, void *data); +extern void fdc_open(int fd, unsigned int type, char *desc); + +#endif diff --git a/src/ftp.cc b/src/ftp.cc index 6e801cf6d1..095e22e99b 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -1,6 +1,6 @@ /* - * $Id: ftp.cc,v 1.330 2002/10/14 07:35:45 hno Exp $ + * $Id: ftp.cc,v 1.331 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 9 File Transfer Protocol (FTP) * AUTHOR: Harvest Derived @@ -147,12 +147,12 @@ typedef void (FTPSM) (FtpStateData *); /* Local functions */ static CNCB ftpPasvCallback; -static PF ftpDataRead; +static IOCB ftpDataRead; static PF ftpDataWrite; static CWCB ftpDataWriteCallback; static PF ftpStateFree; static PF ftpTimeout; -static PF ftpReadControlReply; +static IOCB ftpReadControlReply; static CWCB ftpWriteCommandCallback; static void ftpLoginParser(const char *, FtpStateData *, int escaped); static wordlist *ftpParseControlReply(char *, size_t, int *, int *); @@ -873,10 +873,9 @@ ftpDataComplete(FtpStateData * ftpState) } static void -ftpDataRead(int fd, void *data) +ftpDataRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { FtpStateData *ftpState = (FtpStateData *)data; - int len; int j; int bin; StoreEntry *entry = ftpState->entry; @@ -886,20 +885,18 @@ ftpDataRead(int fd, void *data) delay_id delayId = delayMostBytesAllowed(mem); #endif assert(fd == ftpState->data.fd); + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us + */ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); return; } - errno = 0; - read_sz = ftpState->data.size - ftpState->data.offset; -#if DELAY_POOLS - read_sz = delayBytesWanted(delayId, 1, read_sz); -#endif - memset(ftpState->data.buf + ftpState->data.offset, '\0', read_sz); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, ftpState->data.buf + ftpState->data.offset, read_sz); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delayId, len); #endif @@ -907,8 +904,8 @@ ftpDataRead(int fd, void *data) kb_incr(&statCounter.server.ftp.kbytes_in, len); ftpState->data.offset += len; } - debug(9, 5) ("ftpDataRead: FD %d, Read %d bytes\n", fd, len); - if (len > 0) { + debug(9, 5) ("ftpDataRead: FD %d, Read %d bytes\n", fd, (unsigned int)len); + if (flag == COMM_OK && len > 0) { IOStats.Ftp.reads++; for (j = len - 1, bin = 0; j; bin++) j >>= 1; @@ -917,14 +914,15 @@ ftpDataRead(int fd, void *data) if (ftpState->flags.isdir && !ftpState->flags.html_header_sent && len >= 0) { ftpListingStart(ftpState); } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpDataRead: read error: %s\n", xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + read_sz = ftpState->data.size - ftpState->data.offset; +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + comm_read(fd, ftpState->data.buf + ftpState->data.offset, read_sz, ftpDataRead, data); } else { ftpFailed(ftpState, ERR_READ_ERROR); /* ftpFailed closes ctrl.fd and frees ftpState */ @@ -939,12 +937,12 @@ ftpDataRead(int fd, void *data) storeAppend(entry, ftpState->data.buf, len); ftpState->data.offset = 0; } - commSetSelect(fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); - } + /* XXX what about Config.Timeout.read? */ + read_sz = ftpState->data.size - ftpState->data.offset; +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + comm_read(fd, ftpState->data.buf + ftpState->data.offset, read_sz, ftpDataRead, data);} } /* @@ -1222,11 +1220,8 @@ ftpScheduleReadControlReply(FtpStateData * ftpState, int buffered_ok) /* We've already read some reply data */ ftpHandleControlReply(ftpState); } else { - commSetSelect(ftpState->ctrl.fd, - COMM_SELECT_READ, - ftpReadControlReply, - ftpState, - Config.Timeout.read); + /* XXX What about Config.Timeout.read? */ + comm_read(ftpState->ctrl.fd, ftpState->ctrl.buf + ftpState->ctrl.offset, ftpState->ctrl.size - ftpState->ctrl.offset, ftpReadControlReply, ftpState); /* * Cancel the timeout on the Data socket (if any) and * establish one on the control socket. @@ -1239,28 +1234,30 @@ ftpScheduleReadControlReply(FtpStateData * ftpState, int buffered_ok) } static void -ftpReadControlReply(int fd, void *data) +ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { FtpStateData *ftpState = (FtpStateData *)data; StoreEntry *entry = ftpState->entry; - size_t len; debug(9, 5) ("ftpReadControlReply\n"); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us +*/ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); return; } assert(ftpState->ctrl.offset < (off_t)ftpState->ctrl.size); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, - ftpState->ctrl.buf + ftpState->ctrl.offset, - ftpState->ctrl.size - ftpState->ctrl.offset); - if (len > 0) { + if (flag == COMM_OK && len > 0) { fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.ftp.kbytes_in, len); } - debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, len); - if (len < 0) { + debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, (int)len); + if (flag != COMM_OK || len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpReadControlReply: read error: %s\n", xstrerror()); if (ignoreErrno(errno)) { ftpScheduleReadControlReply(ftpState, 0); @@ -1894,42 +1891,39 @@ ftpReadPort(FtpStateData * ftpState) /* "read" handler to accept data connection */ static void -ftpAcceptDataConnection(int fd, void *data) +ftpAcceptDataConnection(int fd, int newfd, struct sockaddr_in *me, struct sockaddr_in *my_peer, + comm_err_t flag, int xerrno, void *data) { FtpStateData *ftpState = (FtpStateData *)data; - struct sockaddr_in my_peer, me; debug(9, 3) ("ftpAcceptDataConnection\n"); if (EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) { comm_close(ftpState->ctrl.fd); return; } - fd = comm_accept(fd, &my_peer, &me); + if (Config.Ftp.sanitycheck) { - char *ipaddr = inet_ntoa(my_peer.sin_addr); + char *ipaddr = inet_ntoa(my_peer->sin_addr); if (strcmp(fd_table[ftpState->ctrl.fd].ipaddr, ipaddr) != 0) { - debug(9, 1) ("FTP data connection from unexpected server (%s:%d), expecting %s\n", ipaddr, (int) ntohs(my_peer.sin_port), fd_table[ftpState->ctrl.fd].ipaddr); - comm_close(fd); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + debug(9, 1) ("FTP data connection from unexpected server (%s:%d), expecting %s\n", ipaddr, (int) ntohs(my_peer->sin_port), fd_table[ftpState->ctrl.fd].ipaddr); + comm_close(newfd); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); return; } } - if (fd < 0) { - debug(9, 1) ("ftpHandleDataAccept: comm_accept(%d): %s", fd, xstrerror()); + if (flag != COMM_OK) { + errno = xerrno; + debug(9, 1) ("ftpHandleDataAccept: comm_accept(%d): %s", newfd, xstrerror()); /* XXX Need to set error message */ ftpFail(ftpState); return; } /* Replace the Listen socket with the accepted data socket */ comm_close(ftpState->data.fd); - debug(9, 3) ("ftpAcceptDataConnection: Connected data socket on FD %d\n", fd); - ftpState->data.fd = fd; - ftpState->data.port = ntohs(my_peer.sin_port); - ftpState->data.host = xstrdup(inet_ntoa(my_peer.sin_addr)); + debug(9, 3) ("ftpAcceptDataConnection: Connected data socket on FD %d\n", newfd); + ftpState->data.fd = newfd; + ftpState->data.port = ntohs(my_peer->sin_port); + ftpState->data.host = xstrdup(inet_ntoa(my_peer->sin_addr)); commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL); commSetTimeout(ftpState->data.fd, Config.Timeout.read, ftpTimeout, ftpState); @@ -2008,11 +2002,7 @@ ftpReadStor(FtpStateData * ftpState) } else if (code == 150) { /* Accept data channel */ debug(9, 3) ("ftpReadStor: accepting data channel\n"); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); } else { debug(9, 3) ("ftpReadStor: Unexpected reply code %03d\n", code); ftpFail(ftpState); @@ -2098,11 +2088,9 @@ ftpReadList(FtpStateData * ftpState) if (code == 125 || (code == 150 && ftpState->data.host)) { /* Begin data transfer */ ftpAppendSuccessHeader(ftpState); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + assert(ftpState->data.offset == 0); + comm_read(ftpState->data.fd, ftpState->data.buf, ftpState->data.size, ftpDataRead, ftpState); commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry); ftpState->state = READING_DATA; /* @@ -2114,11 +2102,7 @@ ftpReadList(FtpStateData * ftpState) return; } else if (code == 150) { /* Accept data channel */ - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); /* * Cancel the timeout on the Control socket and establish one * on the data socket @@ -2152,11 +2136,13 @@ ftpReadRetr(FtpStateData * ftpState) /* Begin data transfer */ debug(9, 3) ("ftpReadRetr: reading data channel\n"); ftpAppendSuccessHeader(ftpState); - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpDataRead, - ftpState, - Config.Timeout.read); + /* XXX what about Config.Timeout.read? */ + size_t read_sz = ftpState->data.size - ftpState->data.offset; +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + comm_read(ftpState->data.fd, ftpState->data.buf + ftpState->data.offset, + read_sz, ftpDataRead, ftpState); commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry); ftpState->state = READING_DATA; /* @@ -2168,11 +2154,7 @@ ftpReadRetr(FtpStateData * ftpState) ftpState); } else if (code == 150) { /* Accept data channel */ - commSetSelect(ftpState->data.fd, - COMM_SELECT_READ, - ftpAcceptDataConnection, - ftpState, - 0); + comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState); /* * Cancel the timeout on the Control socket and establish one * on the data socket diff --git a/src/gopher.cc b/src/gopher.cc index 5228f165e4..1d01d2d037 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -1,6 +1,6 @@ /* - * $Id: gopher.cc,v 1.174 2002/10/14 07:35:45 hno Exp $ + * $Id: gopher.cc,v 1.175 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 10 Gopher * AUTHOR: Harvest Derived @@ -87,6 +87,7 @@ typedef struct gopher_ds { int fd; request_t *req; FwdState *fwdState; + char replybuf[BUFSIZ]; } GopherStateData; static PF gopherStateFree; @@ -98,7 +99,7 @@ static void gopher_request_parse(const request_t * req, static void gopherEndHTML(GopherStateData *); static void gopherToHTML(GopherStateData *, char *inbuf, int len); static PF gopherTimeout; -static PF gopherReadReply; +static IOCB gopherReadReply; static CWCB gopherSendComplete; static PF gopherSendRequest; @@ -615,51 +616,54 @@ gopherTimeout(int fd, void *data) /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ static void -gopherReadReply(int fd, void *data) +gopherReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { GopherStateData *gopherState = (GopherStateData *)data; StoreEntry *entry = gopherState->entry; - char *buf = NULL; - int len; int clen; int bin; - size_t read_sz; + size_t read_sz = BUFSIZ; + int do_next_read = 0; #if DELAY_POOLS delay_id delayId = delayMostBytesAllowed(entry->mem_obj); #endif + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + + assert(buf == gopherState->replybuf); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(fd); return; } + errno = 0; - buf = (char *)memAllocate(MEM_4K_BUF); - read_sz = 4096 - 1; /* leave room for termination */ #if DELAY_POOLS read_sz = delayBytesWanted(delayId, 1, read_sz); #endif + /* leave one space for \0 in gopherToHTML */ - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, read_sz); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delayId, len); #endif kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.other.kbytes_in, len); } - debug(10, 5) ("gopherReadReply: FD %d read len=%d\n", fd, len); - if (len > 0) { + debug(10, 5) ("gopherReadReply: FD %d read len=%d\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { commSetTimeout(fd, Config.Timeout.read, NULL, NULL); IOStats.Gopher.reads++; for (clen = len - 1, bin = 0; clen; bin++) clen >>= 1; IOStats.Gopher.read_hist[bin]++; } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, 1) ("gopherReadReply: error reading: %s\n", xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0); + do_next_read = 1; } else if (entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); @@ -667,8 +671,10 @@ gopherReadReply(int fd, void *data) err->url = xstrdup(storeUrl(entry)); errorAppendEntry(entry, err); comm_close(fd); + do_next_read = 0; } else { comm_close(fd); + do_next_read = 0; } } else if (len == 0 && entry->mem_obj->inmem_hi == 0) { ErrorState *err; @@ -677,6 +683,7 @@ gopherReadReply(int fd, void *data) err->url = xstrdup(gopherState->request); errorAppendEntry(entry, err); comm_close(fd); + do_next_read = 0; } else if (len == 0) { /* Connection closed; retrieval done. */ /* flush the rest of data in temp buf if there is one. */ @@ -686,18 +693,17 @@ gopherReadReply(int fd, void *data) storeBufferFlush(entry); fwdComplete(gopherState->fwdState); comm_close(fd); + do_next_read = 0; } else { if (gopherState->conversion != gopher_ds::NORMAL) { gopherToHTML(gopherState, buf, len); } else { storeAppend(entry, buf, len); } - commSetSelect(fd, - COMM_SELECT_READ, - gopherReadReply, - gopherState, 0); + do_next_read = 1; } - memFree(buf, MEM_4K_BUF); + if (do_next_read) + comm_read(fd, buf, read_sz, gopherReadReply, gopherState); return; } @@ -757,7 +763,8 @@ gopherSendComplete(int fd, char *buf, size_t size, comm_err_t errflag, void *dat gopherState->conversion = gopher_ds::NORMAL; } /* Schedule read reply. */ - commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0); + /* XXX this read isn't being bound by delay pools! */ + comm_read(fd, gopherState->replybuf, BUFSIZ, gopherReadReply, gopherState); commSetDefer(fd, fwdCheckDeferRead, entry); if (buf) memFree(buf, MEM_4K_BUF); /* Allocated by gopherSendRequest. */ @@ -847,6 +854,6 @@ gopherStart(FwdState * fwdState) } gopherState->fd = fd; gopherState->fwdState = fwdState; - commSetSelect(fd, COMM_SELECT_WRITE, gopherSendRequest, gopherState, 0); + gopherSendRequest(fd, gopherState); commSetTimeout(fd, Config.Timeout.read, gopherTimeout, gopherState); } diff --git a/src/helper.cc b/src/helper.cc index 80a76aa6e7..b48f003f6d 100644 --- a/src/helper.cc +++ b/src/helper.cc @@ -1,6 +1,6 @@ /* - * $Id: helper.cc,v 1.48 2002/10/13 20:35:01 robertc Exp $ + * $Id: helper.cc,v 1.49 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 84 Helper process maintenance * AUTHOR: Harvest Derived? @@ -38,8 +38,8 @@ #define HELPER_MAX_ARGS 64 -static PF helperHandleRead; -static PF helperStatefulHandleRead; +static IOCB helperHandleRead; +static IOCB helperStatefulHandleRead; static PF helperServerFree; static PF helperStatefulServerFree; static void Enqueue(helper * hlp, helper_request *); @@ -697,21 +697,23 @@ helperStatefulServerFree(int fd, void *data) static void -helperHandleRead(int fd, void *data) +helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { - int len; char *t = NULL; helper_server *srv = (helper_server *)data; helper_request *r; helper *hlp = srv->parent; assert(fd == srv->rfd); assert(cbdataReferenceValid(data)); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset); - fd_bytes(fd, len, FD_READ); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + debug(84, 5) ("helperHandleRead: %d bytes from %s #%d.\n", - len, hlp->id_name, srv->index + 1); - if (len <= 0) { + (int)len, hlp->id_name, srv->index + 1); + if (flag != COMM_OK || len <= 0) { if (len < 0) debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror()); comm_close(fd); @@ -723,7 +725,7 @@ helperHandleRead(int fd, void *data) if (r == NULL) { /* someone spoke without being spoken to */ debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n", - hlp->id_name, srv->index + 1, len); + hlp->id_name, srv->index + 1, (int)len); srv->offset = 0; } else if ((t = strchr(srv->buf, '\n'))) { /* end of reply found */ @@ -752,26 +754,28 @@ helperHandleRead(int fd, void *data) } else helperKickQueue(hlp); } else { - commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0); + comm_read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, data); } } static void -helperStatefulHandleRead(int fd, void *data) +helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { - int len; char *t = NULL; helper_stateful_server *srv = (helper_stateful_server *)data; helper_stateful_request *r; statefulhelper *hlp = srv->parent; assert(fd == srv->rfd); assert(cbdataReferenceValid(data)); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset); - fd_bytes(fd, len, FD_READ); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + debug(84, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n", - len, hlp->id_name, srv->index + 1); - if (len <= 0) { + (int)len, hlp->id_name, srv->index + 1); + if (flag != COMM_OK || len <= 0) { if (len < 0) debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror()); comm_close(fd); @@ -783,7 +787,7 @@ helperStatefulHandleRead(int fd, void *data) if (r == NULL) { /* someone spoke without being spoken to */ debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n", - hlp->id_name, srv->index + 1, len); + hlp->id_name, srv->index + 1, (int)len); srv->offset = 0; } else if ((t = strchr(srv->buf, '\n'))) { /* end of reply found */ @@ -852,7 +856,8 @@ helperStatefulHandleRead(int fd, void *data) helperStatefulKickQueue(hlp); } } else { - commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0); + comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, + helperStatefulHandleRead, srv); } } @@ -1024,10 +1029,7 @@ helperDispatch(helper_server * srv, helper_request * r) NULL, /* Handler */ NULL, /* Handler-data */ NULL); /* free */ - commSetSelect(srv->rfd, - COMM_SELECT_READ, - helperHandleRead, - srv, 0); + comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, srv); debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, (int) strlen(r->buf)); srv->stats.uses++; @@ -1078,10 +1080,8 @@ helperStatefulDispatch(helper_stateful_server * srv, helper_stateful_request * r NULL, /* Handler */ NULL, /* Handler-data */ NULL); /* free */ - commSetSelect(srv->rfd, - COMM_SELECT_READ, - helperStatefulHandleRead, - srv, 0); + comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, + helperStatefulHandleRead, srv); debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n", hlp->id_name, srv->index + 1, (int) strlen(r->buf)); srv->stats.uses++; diff --git a/src/http.cc b/src/http.cc index e97dcf0c9a..68a1b747ab 100644 --- a/src/http.cc +++ b/src/http.cc @@ -1,6 +1,6 @@ /* - * $Id: http.cc,v 1.396 2002/10/14 07:35:46 hno Exp $ + * $Id: http.cc,v 1.397 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 11 Hypertext Transfer Protocol (HTTP) * AUTHOR: Harvest Derived @@ -51,7 +51,7 @@ static const char *const crlf = "\r\n"; static CWCB httpSendComplete; static CWCB httpSendRequestEntity; -static PF httpReadReply; +static IOCB httpReadReply; static void httpSendRequest(HttpStateData *); static PF httpStateFree; static PF httpTimeout; @@ -555,16 +555,22 @@ httpPconnTransferDone(HttpStateData * httpState) * error or connection closed. */ /* XXX this function is too long! */ static void -httpReadReply(int fd, void *data) +httpReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno,void *data) +{ + HttpStateData *httpState = static_cast(data); + httpState->readReply (fd, buf, len, flag, xerrno, data); +} + +void +HttpStateData::readReply (int fd, char *buf, size_t len, comm_err_t flag, int xerrno,void *data) { HttpStateData *httpState = static_cast(data); - LOCAL_ARRAY(char, buf, SQUID_TCP_SO_RCVBUF); StoreEntry *entry = httpState->entry; const request_t *request = httpState->request; - int len; int bin; int clen; - size_t read_sz; + read_sz = SQUID_TCP_SO_RCVBUF; + do_next_read = 0; #if DELAY_POOLS delay_id delayId; @@ -574,21 +580,30 @@ httpReadReply(int fd, void *data) else delayId = delayMostBytesAllowed(entry->mem_obj); #endif + + + assert(buf == httpState->buf); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us +*/ + if (flag == COMM_ERR_CLOSING) { + return; + } + + + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - comm_close(fd); + maybeReadData(); return; } - /* check if we want to defer reading */ + errno = 0; - read_sz = SQUID_TCP_SO_RCVBUF; + /* prepare the read size for the next read (if any) */ #if DELAY_POOLS read_sz = delayBytesWanted(delayId, 1, read_sz); #endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, read_sz); - debug(11, 5) ("httpReadReply: FD %d: len %d.\n", fd, len); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + debug(11, 5) ("httpReadReply: FD %d: len %d.\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delayId, len); #endif @@ -600,40 +615,44 @@ httpReadReply(int fd, void *data) clen >>= 1; IOStats.Http.read_hist[bin]++; } - if (!httpState->reply_hdr && len > 0) { + if (!httpState->reply_hdr && flag == COMM_OK && len > 0) { /* Skip whitespace */ while (len > 0 && xisspace(*buf)) xmemmove(buf, buf + 1, len--); if (len == 0) { /* Continue to read... */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + do_next_read = 1; + maybeReadData(); return; } } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, 2) ("httpReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + do_next_read = 1; } else if (entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); err->request = requestLink((request_t *) request); err->xerrno = errno; fwdFail(httpState->fwd, err); + do_next_read = 0; comm_close(fd); } else { + do_next_read = 0; comm_close(fd); } - } else if (len == 0 && entry->mem_obj->inmem_hi == 0) { + } else if (flag == COMM_OK && len == 0 && entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE); err->xerrno = errno; err->request = requestLink((request_t *) request); fwdFail(httpState->fwd, err); httpState->eof = 1; + do_next_read = 0; comm_close(fd); - } else if (len == 0) { + } else if (flag == COMM_OK && len == 0) { /* Connection closed; retrieval done. */ httpState->eof = 1; if (httpState->reply_hdr_state < 2) @@ -647,6 +666,7 @@ httpReadReply(int fd, void *data) httpState->processReplyHeader(buf, len); else { fwdComplete(httpState->fwd); + do_next_read = 0; comm_close(fd); } } else { @@ -684,7 +704,7 @@ HttpStateData::processReplyData(const char *buf, int len) /* yes we have to clear all these! */ commSetDefer(fd, NULL, NULL); commSetTimeout(fd, -1, NULL, NULL); - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); + do_next_read = 0; #if DELAY_POOLS delayClearNoDelay(fd); #endif @@ -696,8 +716,16 @@ HttpStateData::processReplyData(const char *buf, int len) httpStateFree(fd, this); } else { /* Wait for EOF condition */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, this, 0); + do_next_read = 1; } + maybeReadData(); +} + +void +HttpStateData::maybeReadData() +{ + if (do_next_read) + comm_read(fd, buf, read_sz, httpReadReply, this); } /* This will be called when request write is complete. Schedule read of @@ -729,7 +757,8 @@ httpSendComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, void return; } else { /* Schedule read reply. */ - commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0); + /* XXX we're not taking into account delay pools on this read! */ + comm_read(fd, httpState->buf, SQUID_TCP_SO_RCVBUF, httpReadReply, httpState); /* * Set the read timeout here because it hasn't been set yet. * We only set the read timeout after the request has been diff --git a/src/http.h b/src/http.h index c4e9e559b5..8b11dc9354 100644 --- a/src/http.h +++ b/src/http.h @@ -1,6 +1,6 @@ /* - * $Id: http.h,v 1.1 2002/10/13 20:35:01 robertc Exp $ + * $Id: http.h,v 1.2 2002/10/14 08:16:58 robertc Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -35,11 +35,14 @@ #define SQUID_HTTP_H #include "StoreIOBuffer.h" +#include "comm.h" class HttpStateData { public: void processReplyHeader(const char *, int); void processReplyData(const char *, int); + IOCB readReply; + void maybeReadData(); StoreEntry *entry; request_t *request; @@ -52,6 +55,9 @@ class HttpStateData { int fd; http_state_flags flags; FwdState *fwd; + int do_next_read; + size_t read_sz; + char buf[SQUID_TCP_SO_RCVBUF]; }; #endif /* SQUID_HTTP_H */ diff --git a/src/ident.cc b/src/ident.cc index 9a46973f87..1ee5842345 100644 --- a/src/ident.cc +++ b/src/ident.cc @@ -1,6 +1,6 @@ /* - * $Id: ident.cc,v 1.60 2002/09/15 06:23:29 adrian Exp $ + * $Id: ident.cc,v 1.61 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 30 Ident (RFC 931) * AUTHOR: Duane Wessels @@ -52,9 +52,10 @@ typedef struct _IdentStateData { struct sockaddr_in me; struct sockaddr_in my_peer; IdentClient *clients; + char buf[4096]; } IdentStateData; -static PF identReadReply; +static IOCB identReadReply; static PF identClose; static PF identTimeout; static CNCB identConnectDone; @@ -125,23 +126,20 @@ identConnectDone(int fd, comm_err_t status, void *data) ntohs(state->my_peer.sin_port), ntohs(state->me.sin_port)); comm_write_mbuf(fd, mb, NULL, state); - commSetSelect(fd, COMM_SELECT_READ, identReadReply, state, 0); + comm_read(fd, state->buf, BUFSIZ, identReadReply, state); commSetTimeout(fd, Config.Timeout.ident, identTimeout, state); } static void -identReadReply(int fd, void *data) +identReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { IdentStateData *state = data; - LOCAL_ARRAY(char, buf, BUFSIZ); char *ident = NULL; char *t = NULL; - int len = -1; - buf[0] = '\0'; - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, BUFSIZ - 1); - fd_bytes(fd, len, FD_READ); - if (len <= 0) { + + assert (buf == state->buf); + + if (if (flag != COMM_OK || len <= 0) { comm_close(fd); return; } diff --git a/src/ipc.cc b/src/ipc.cc index a5cca7c221..a82b81d9a0 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -1,6 +1,6 @@ /* - * $Id: ipc.cc,v 1.29 2002/10/13 20:35:02 robertc Exp $ + * $Id: ipc.cc,v 1.30 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 54 Interprocess Communication * AUTHOR: Duane Wessels @@ -34,6 +34,7 @@ */ #include "squid.h" +#include "comm.h" static const char *hello_string = "hi there\n"; #define HELLO_BUF_SZ 32 @@ -119,10 +120,10 @@ ipcCreate(int type, const char *prog, const char *const args[], const char *name debug(50, 0) ("ipcCreate: pipe: %s\n", xstrerror()); return -1; } - fd_open(prfd = p2c[0], FD_PIPE, "IPC FIFO Parent Read"); - fd_open(cwfd = p2c[1], FD_PIPE, "IPC FIFO Child Write"); - fd_open(crfd = c2p[0], FD_PIPE, "IPC FIFO Child Read"); - fd_open(pwfd = c2p[1], FD_PIPE, "IPC FIFO Parent Write"); + fdc_open(prfd = p2c[0], FD_PIPE, "IPC FIFO Parent Read"); + fdc_open(cwfd = p2c[1], FD_PIPE, "IPC FIFO Child Write"); + fdc_open(crfd = c2p[0], FD_PIPE, "IPC FIFO Child Read"); + fdc_open(pwfd = c2p[1], FD_PIPE, "IPC FIFO Parent Write"); #if HAVE_SOCKETPAIR && defined(AF_UNIX) } else if (type == IPC_UNIX_STREAM) { int fds[2]; @@ -135,16 +136,16 @@ ipcCreate(int type, const char *prog, const char *const args[], const char *name setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (void *) &buflen, sizeof(buflen)); setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (void *) &buflen, sizeof(buflen)); setsockopt(fds[1], SOL_SOCKET, SO_RCVBUF, (void *) &buflen, sizeof(buflen)); - fd_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX STREAM Parent"); - fd_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX STREAM Parent"); + fdc_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX STREAM Parent"); + fdc_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX STREAM Parent"); } else if (type == IPC_UNIX_DGRAM) { int fds[2]; if (socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) < 0) { debug(50, 0) ("ipcCreate: socketpair: %s\n", xstrerror()); return -1; } - fd_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX DGRAM Parent"); - fd_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX DGRAM Parent"); + fdc_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX DGRAM Parent"); + fdc_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX DGRAM Parent"); #endif } else { assert(IPC_NONE); diff --git a/src/main.cc b/src/main.cc index a2221c761f..c984f94ca1 100644 --- a/src/main.cc +++ b/src/main.cc @@ -1,6 +1,6 @@ /* - * $Id: main.cc,v 1.355 2002/10/13 20:35:02 robertc Exp $ + * $Id: main.cc,v 1.356 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 1 Startup and Main Loop * AUTHOR: Harvest Derived @@ -718,6 +718,7 @@ main(int argc, char **argv) eventRun(); if ((loop_delay = eventNextTime()) < 0) loop_delay = 0; + comm_calliocallback(); switch (comm_select(loop_delay)) { case COMM_OK: errcount = 0; /* reset if successful */ diff --git a/src/pconn.cc b/src/pconn.cc index 4e0482ac09..88f0fc3e03 100644 --- a/src/pconn.cc +++ b/src/pconn.cc @@ -1,6 +1,6 @@ /* - * $Id: pconn.cc,v 1.33 2002/10/13 20:35:02 robertc Exp $ + * $Id: pconn.cc,v 1.34 2002/10/14 08:16:58 robertc Exp $ * * DEBUG: section 48 Persistent Connections * AUTHOR: Duane Wessels @@ -35,12 +35,14 @@ #include "squid.h" #include "Store.h" +#include "comm.h" struct _pconn { hash_link hash; /* must be first */ int *fds; int nfds_alloc; int nfds; + char buf[4096]; }; typedef struct _pconn pconn; @@ -49,7 +51,7 @@ typedef struct _pconn pconn; int client_pconn_hist[PCONN_HIST_SZ]; int server_pconn_hist[PCONN_HIST_SZ]; -static PF pconnRead; +static IOCB pconnRead; static PF pconnTimeout; static const char *pconnKey(const char *host, u_short port); static hash_table *table = NULL; @@ -97,18 +99,37 @@ pconnDelete(struct _pconn *p) cbdataFree(p); } -static void -pconnRemoveFD(struct _pconn *p, int fd) +static int +pconnFindFDIndex (struct _pconn *p, int fd) { - int i; - for (i = 0; i < p->nfds; i++) { - if (p->fds[i] == fd) - break; + int result; + for (result = 0; result < p->nfds; ++result) { + if (p->fds[result] == fd) + return result; } + return p->nfds; +} + +static void +pconnRemoveFDByIndex (struct _pconn *p, int index) +{ + for (; index < p->nfds - 1; index++) + p->fds[index] = p->fds[index + 1]; +} + +static void +pconnPreventHandingOutFD(struct _pconn *p, int fd) +{ + int i = pconnFindFDIndex (p, fd); assert(i < p->nfds); debug(48, 3) ("pconnRemoveFD: found FD %d at index %d\n", fd, i); - for (; i < p->nfds - 1; i++) - p->fds[i] = p->fds[i + 1]; + pconnRemoveFDByIndex(p, i); +} + +static void +pconnRemoveFD(struct _pconn *p, int fd) +{ + pconnPreventHandingOutFD(p, fd); if (--p->nfds == 0) pconnDelete(p); } @@ -124,15 +145,16 @@ pconnTimeout(int fd, void *data) } static void -pconnRead(int fd, void *data) +pconnRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { - LOCAL_ARRAY(char, buf, 256); struct _pconn *p = (_pconn *)data; - int n; assert(table != NULL); - statCounter.syscalls.sock.reads++; - n = FD_READ_METHOD(fd, buf, 256); - debug(48, 3) ("pconnRead: %d bytes from FD %d, %s\n", n, fd, + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + + debug(48, 3) ("pconnRead: %d bytes from FD %d, %s\n", (int) len, fd, hashKeyStr(&p->hash)); pconnRemoveFD(p, fd); comm_close(fd); @@ -220,13 +242,23 @@ pconnPush(int fd, const char *host, u_short port) xfree(old); } p->fds[p->nfds++] = fd; - commSetSelect(fd, COMM_SELECT_READ, pconnRead, p, 0); + comm_read(fd, p->buf, BUFSIZ, pconnRead, p); commSetTimeout(fd, Config.Timeout.pconn, pconnTimeout, p); snprintf(desc, FD_DESC_SZ, "%s idle connection", host); fd_note(fd, desc); debug(48, 3) ("pconnPush: pushed FD %d for %s\n", fd, key); } +/* + * return a pconn fd for host:port, or -1 if none are available + * + * XXX this routine isn't terribly efficient - if there's a pending + * read event (which signifies the fd will close in the next IO loop!) + * we ignore the FD and move onto the next one. This means, as an example, + * if we have a lot of FDs open to a very popular server and we get a bunch + * of requests JUST as they timeout (say, it shuts down) we'll be wasting + * quite a bit of CPU. Just keep it in mind. + */ int pconnPop(const char *host, u_short port) { @@ -240,11 +272,18 @@ pconnPop(const char *host, u_short port) if (hptr != NULL) { p = (struct _pconn *) hptr; assert(p->nfds > 0); - fd = p->fds[0]; - pconnRemoveFD(p, fd); - commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); - commSetTimeout(fd, -1, NULL, NULL); + for (int i = 0; i < p->nfds; i++) { + fd = p->fds[0]; + /* If there are pending read callbacks - we're about to close it, so don't issue it! */ + if (!comm_has_pending_read_callback(fd)) { + pconnRemoveFD(p, fd); + comm_read_cancel(fd, pconnRead, p); + commSetTimeout(fd, -1, NULL, NULL); + return fd; + } + } } + /* Nothing (valid!) found */ return fd; } diff --git a/src/protos.h b/src/protos.h index 743f0670cd..6348c2db46 100644 --- a/src/protos.h +++ b/src/protos.h @@ -1,6 +1,6 @@ /* - * $Id: protos.h,v 1.453 2002/10/13 23:48:24 hno Exp $ + * $Id: protos.h,v 1.454 2002/10/14 08:16:58 robertc Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -152,11 +152,19 @@ SQUIDCEXTERN void *clientReplyNewContext(clientHttpRequest *); SQUIDCEXTERN int clientHttpRequestStatus(int fd, clientHttpRequest const *http); SQUIDCEXTERN void clientSetReplyToError(void *, err_type, http_status, method_t, char const *, struct in_addr *, request_t *, char *, auth_user_request_t * auth_user_request); +/* comm.c */ +extern int comm_existsiocallback(void); +extern void comm_calliocallback(void); +extern void comm_read(int fd, char *buf, int len, IOCB *handler, void *data); + +extern void comm_accept(int fd, IOACB *handler, void *handler_data); +extern int comm_listen(int fd); SQUIDCEXTERN int commSetNonBlocking(int fd); SQUIDCEXTERN int commUnsetNonBlocking(int fd); SQUIDCEXTERN void commSetCloseOnExec(int fd); -SQUIDCEXTERN int comm_accept(int fd, struct sockaddr_in *, struct sockaddr_in *); -SQUIDCEXTERN void comm_close(int fd); +extern int comm_old_accept(int fd, struct sockaddr_in *, struct sockaddr_in *); +extern void _comm_close(int fd, char *file, int line); +#define comm_close(fd) (_comm_close((fd), __FILE__, __LINE__)) SQUIDCEXTERN void comm_reset_close(int fd); #if LINGERING_CLOSE SQUIDCEXTERN void comm_lingering_close(int fd); @@ -164,7 +172,6 @@ SQUIDCEXTERN void comm_lingering_close(int fd); SQUIDCEXTERN void commConnectStart(int fd, const char *, u_short, CNCB *, void *); SQUIDCEXTERN int comm_connect_addr(int sock, const struct sockaddr_in *); SQUIDCEXTERN void comm_init(void); -SQUIDCEXTERN int comm_listen(int sock); SQUIDCEXTERN int comm_open(int, int, struct in_addr, u_short port, int, const char *note); SQUIDCEXTERN int comm_openex(int, int, struct in_addr, u_short, int, unsigned char TOS, const char *); SQUIDCEXTERN u_short comm_local_port(int fd); @@ -531,8 +538,6 @@ SQUIDCEXTERN void icmpPing(struct in_addr to); SQUIDCEXTERN void icmpSourcePing(struct in_addr to, const icp_common_t *, const char *url); SQUIDCEXTERN void icmpDomainPing(struct in_addr to, const char *domain); -SQUIDCEXTERN PF httpAccept; - #ifdef SQUID_SNMP SQUIDCEXTERN PF snmpHandleUdp; SQUIDCEXTERN void snmpInit(void); diff --git a/src/ssl.cc b/src/ssl.cc index bb5a722a1d..6b88976b8c 100644 --- a/src/ssl.cc +++ b/src/ssl.cc @@ -1,6 +1,6 @@ /* - * $Id: ssl.cc,v 1.125 2002/10/14 07:35:46 hno Exp $ + * $Id: ssl.cc,v 1.126 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 26 Secure Sockets Layer Proxy * AUTHOR: Duane Wessels @@ -59,16 +59,15 @@ static CNCB sslConnectDone; static ERCB sslErrorComplete; static PF sslServerClosed; static PF sslClientClosed; -static PF sslReadClient; -static PF sslReadServer; +static IOCB sslReadClient; +static IOCB sslReadServer; static PF sslTimeout; -static PF sslWriteClient; -static PF sslWriteServer; +static CWCB sslWriteClientDone; +static CWCB sslWriteServerDone; static PSC sslPeerSelectComplete; static void sslStateFree(SslStateData * sslState); static void sslConnected(int fd, void *); static void sslProxyConnected(int fd, void *); -static void sslSetSelect(SslStateData * sslState); #if DELAY_POOLS static DEFER sslDeferServerRead; #endif @@ -129,81 +128,24 @@ sslDeferServerRead(int fdnotused, void *data) } #endif -static void -sslSetSelect(SslStateData * sslState) -{ - size_t read_sz = SQUID_TCP_SO_RCVBUF; - assert(sslState->server.fd > -1 || sslState->client.fd > -1); - if (sslState->client.fd > -1) { - if (sslState->server.len > 0) { - commSetSelect(sslState->client.fd, - COMM_SELECT_WRITE, - sslWriteClient, - sslState, - 0); - } - if ((size_t)sslState->client.len < read_sz) { - commSetSelect(sslState->client.fd, - COMM_SELECT_READ, - sslReadClient, - sslState, - Config.Timeout.read); - } - } else if (sslState->client.len == 0) { - comm_close(sslState->server.fd); - } - if (sslState->server.fd > -1) { - if (sslState->client.len > 0) { - commSetSelect(sslState->server.fd, - COMM_SELECT_WRITE, - sslWriteServer, - sslState, - 0); - } -#if DELAY_POOLS - /* - * If this was allowed to return 0, there would be a possibility - * of the socket becoming "hung" with data accumulating but no - * write handler (server.len==0) and no read handler (!(0<0)) and - * no data flowing in the other direction. Hence the argument of - * 1 as min. - */ - read_sz = delayBytesWanted(sslState->delayId, 1, read_sz); -#endif - if ((size_t)sslState->server.len < read_sz) { - /* Have room to read more */ - commSetSelect(sslState->server.fd, - COMM_SELECT_READ, - sslReadServer, - sslState, - Config.Timeout.read); - } - } else if (sslState->client.fd == -1) { - /* client already closed, nothing more to do */ - } else if (sslState->server.len == 0) { - comm_close(sslState->client.fd); - } -} /* Read from server side and queue it for writing to the client */ static void -sslReadServer(int fd, void *data) +sslReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; - size_t read_sz = SQUID_TCP_SO_RCVBUF - sslState->server.len; + assert(fd == sslState->server.fd); - debug(26, 3) ("sslReadServer: FD %d, reading %d bytes at offset %d\n", - fd, (int) read_sz, sslState->server.len); errno = 0; #if DELAY_POOLS read_sz = delayBytesWanted(sslState->delayId, 1, read_sz); #endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, sslState->server.buf + sslState->server.len, read_sz); - debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, len); + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (errcode == COMM_ERR_CLOSING) { + return; + } + debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, (int)len); if (len > 0) { - fd_bytes(fd, len, FD_READ); #if DELAY_POOLS delayBytesIn(sslState->delayId, len); #endif @@ -219,29 +161,29 @@ sslReadServer(int fd, void *data) comm_close(fd); } else if (len == 0) { comm_close(sslState->server.fd); - } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + /* Only close the remote end if we've finished queueing data to it */ + if (sslState->server.len == 0 && sslState->client.fd != -1) { + comm_close(sslState->client.fd); + } + } else if (cbdataReferenceValid(sslState)) + comm_write(sslState->client.fd, sslState->server.buf, len, sslWriteClientDone, sslState, NULL); cbdataInternalUnlock(sslState); /* ??? */ } /* Read from client side and queue it for writing to the server */ static void -sslReadClient(int fd, void *data) +sslReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; assert(fd == sslState->client.fd); - debug(26, 3) ("sslReadClient: FD %d, reading %d bytes at offset %d\n", - fd, SQUID_TCP_SO_RCVBUF - sslState->client.len, - sslState->client.len); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, - sslState->client.buf + sslState->client.len, - SQUID_TCP_SO_RCVBUF - sslState->client.len); - debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, len); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (errcode == COMM_ERR_CLOSING) { + return; + } + + debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, (int) len); if (len > 0) { - fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.client_http.kbytes_in, len); sslState->client.len += len; } @@ -249,100 +191,106 @@ sslReadClient(int fd, void *data) if (len < 0) { int level = 1; #ifdef ECONNRESET - if (errno == ECONNRESET) + if (xerrno == ECONNRESET) level = 2; #endif - if (ignoreErrno(errno)) + if (ignoreErrno(xerrno)) level = 3; + /* XXX xstrerror() should be changed to take errno as an arg! */ + errno = xerrno; debug(50, level) ("sslReadClient: FD %d: read failure: %s\n", fd, xstrerror()); - if (!ignoreErrno(errno)) + if (!ignoreErrno(xerrno)) comm_close(fd); } else if (len == 0) { - comm_close(fd); - } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + comm_close(sslState->client.fd); + /* Only close the remote end if we've finished queueing data to it */ + if (sslState->client.len == 0 && sslState->server.fd != -1) { + comm_close(sslState->server.fd); + } + } else if (cbdataReferenceValid(sslState)) + comm_write(sslState->server.fd, sslState->client.buf, len, sslWriteServerDone, sslState, NULL); cbdataInternalUnlock(sslState); /* ??? */ } /* Writes data from the client buffer to the server side */ static void -sslWriteServer(int fd, void *data) +sslWriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; assert(fd == sslState->server.fd); - debug(26, 3) ("sslWriteServer: FD %d, %d bytes to write\n", - fd, sslState->client.len); - statCounter.syscalls.sock.writes++; - len = FD_WRITE_METHOD(fd, - sslState->client.buf, - sslState->client.len); - debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, len); + debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, (int)len); + /* Valid data */ if (len > 0) { - fd_bytes(fd, len, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, len); kb_incr(&statCounter.server.other.kbytes_out, len); - assert(len <= sslState->client.len); - sslState->client.len -= len; - if (sslState->client.len > 0) { - /* we didn't write the whole thing */ - xmemmove(sslState->client.buf, - sslState->client.buf + len, - sslState->client.len); - } + assert(len == (size_t)sslState->client.len); + sslState->client.len = 0; + } + /* EOF */ + if (len == 0) { + comm_close(sslState->server.fd); + return; + } + /* If the other end has closed, so should we */ + if (sslState->client.fd == -1) { + comm_close(sslState->server.fd); + return; } cbdataInternalLock(sslState); /* ??? should be locked by the caller... */ + /* Error? */ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("sslWriteServer: FD %d: write failure: %s.\n", fd, xstrerror()); if (!ignoreErrno(errno)) comm_close(fd); } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + if (cbdataReferenceValid(sslState)) { + assert(sslState->client.len == 0); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } cbdataInternalUnlock(sslState); /* ??? */ } /* Writes data from the server buffer to the client side */ static void -sslWriteClient(int fd, void *data) +sslWriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; assert(fd == sslState->client.fd); - debug(26, 3) ("sslWriteClient: FD %d, %d bytes to write\n", - fd, sslState->server.len); - statCounter.syscalls.sock.writes++; - len = FD_WRITE_METHOD(fd, - sslState->server.buf, - sslState->server.len); - debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, len); + debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, (int)len); if (len > 0) { - fd_bytes(fd, len, FD_WRITE); kb_incr(&statCounter.client_http.kbytes_out, len); - assert(len <= sslState->server.len); - sslState->server.len -= len; + assert(len == (size_t)sslState->server.len); + sslState->server.len =0; /* increment total object size */ if (sslState->size_ptr) *sslState->size_ptr += len; - if (sslState->server.len > 0) { - /* we didn't write the whole thing */ - xmemmove(sslState->server.buf, - sslState->server.buf + len, - sslState->server.len); - } + } + /* EOF */ + if (len == 0) { + comm_close(sslState->client.fd); + return; + } + /* If the other end has closed, so should we */ + if (sslState->server.fd == -1) { + comm_close(sslState->client.fd); + return; } cbdataInternalLock(sslState); /* ??? should be locked by the caller... */ + /* Error? */ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("sslWriteClient: FD %d: write failure: %s.\n", fd, xstrerror()); if (!ignoreErrno(errno)) comm_close(fd); } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + } cbdataInternalUnlock(sslState); /* ??? */ } @@ -357,15 +305,52 @@ sslTimeout(int fd, void *data) comm_close(sslState->server.fd); } +static void +sslConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data) +{ + SslStateData *sslState = (SslStateData *)data; + if (flag != COMM_OK) { + sslErrorComplete(fd, data, 0); + return; + } + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } +} + + +/* + * handle the write completion from a proxy request to an upstream proxy + */ +static void +sslProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data) +{ + SslStateData *sslState = (SslStateData *)data; + if (flag != COMM_OK) { + sslErrorComplete(fd, data, 0); + return; + } + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } +} + static void sslConnected(int fd, void *data) { SslStateData *sslState = (SslStateData *)data; debug(26, 3) ("sslConnected: FD %d sslState=%p\n", fd, sslState); *sslState->status_ptr = HTTP_OK; - xstrncpy(sslState->server.buf, conn_established, SQUID_TCP_SO_RCVBUF); - sslState->server.len = strlen(conn_established); - sslSetSelect(sslState); + comm_write(sslState->client.fd, conn_established, strlen(conn_established), + sslConnectedWriteDone, sslState, NULL); } static void @@ -417,8 +402,9 @@ sslConnectDone(int fdnotused, comm_err_t status, void *data) } else { if (sslState->servers->_peer) sslProxyConnected(sslState->server.fd, sslState); - else + else { sslConnected(sslState->server.fd, sslState); + } commSetTimeout(sslState->server.fd, Config.Timeout.read, sslTimeout, @@ -549,15 +535,13 @@ sslProxyConnected(int fd, void *data) httpHeaderClean(&hdr_out); packerClean(&p); memBufAppend(&mb, "\r\n", 2); - xstrncpy(sslState->client.buf, mb.buf, SQUID_TCP_SO_RCVBUF); - debug(26, 3) ("sslProxyConnected: Sending {%s}\n", sslState->client.buf); - sslState->client.len = mb.size; - memBufClean(&mb); + + comm_write_mbuf(sslState->server.fd, mb, sslProxyConnectedWriteDone, sslState); + commSetTimeout(sslState->server.fd, Config.Timeout.read, sslTimeout, sslState); - sslSetSelect(sslState); } static void diff --git a/src/stmem.cc b/src/stmem.cc index f34d1d488d..7e1f839c42 100644 --- a/src/stmem.cc +++ b/src/stmem.cc @@ -1,6 +1,6 @@ /* - * $Id: stmem.cc,v 1.72 2002/10/13 20:35:03 robertc Exp $ + * $Id: stmem.cc,v 1.73 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 19 Store Memory Primitives * AUTHOR: Harvest Derived @@ -129,7 +129,7 @@ stmemCopy(const mem_hdr * mem, off_t offset, char *buf, size_t size) mem_node *p = mem->head; off_t t_off = mem->origin_offset; size_t bytes_to_go = size; - debug(19, 6) ("memCopy: offset %ld: size %u\n", (long int) offset, size); + debug(19, 6) ("memCopy: offset %ld: size %u\n", (long int) offset, (int)size); if (p == NULL) return 0; /* RC: the next assert is useless */ diff --git a/src/store_client.cc b/src/store_client.cc index 80598255a8..7ea9160d6d 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -1,6 +1,6 @@ /* - * $Id: store_client.cc,v 1.116 2002/10/14 07:35:46 hno Exp $ + * $Id: store_client.cc,v 1.117 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 20 Storage Manager Client-Side Interface * AUTHOR: Duane Wessels @@ -194,7 +194,7 @@ storeClientCopy(store_client * sc, debug(20, 3) ("storeClientCopy: %s, from %lu, for length %d, cb %p, cbdata %p\n", storeKeyText((const cache_key *)e->hash.key), (unsigned long) copyInto.offset, - copyInto.length, + (int) copyInto.length, callback, data); assert(sc != NULL); diff --git a/src/structs.h b/src/structs.h index 805757e9b9..8c9b5daaec 100644 --- a/src/structs.h +++ b/src/structs.h @@ -1,6 +1,6 @@ /* - * $Id: structs.h,v 1.434 2002/10/13 23:48:24 hno Exp $ + * $Id: structs.h,v 1.435 2002/10/14 08:16:59 robertc Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -997,6 +997,9 @@ struct _ConnStateData { int n; time_t until; } defer; + struct { + int readMoreRequests:1; + } flags; }; struct _ipcache_addrs { diff --git a/src/tunnel.cc b/src/tunnel.cc index 4fe649c053..d8f9a427b7 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -1,6 +1,6 @@ /* - * $Id: tunnel.cc,v 1.125 2002/10/14 07:35:46 hno Exp $ + * $Id: tunnel.cc,v 1.126 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 26 Secure Sockets Layer Proxy * AUTHOR: Duane Wessels @@ -59,16 +59,15 @@ static CNCB sslConnectDone; static ERCB sslErrorComplete; static PF sslServerClosed; static PF sslClientClosed; -static PF sslReadClient; -static PF sslReadServer; +static IOCB sslReadClient; +static IOCB sslReadServer; static PF sslTimeout; -static PF sslWriteClient; -static PF sslWriteServer; +static CWCB sslWriteClientDone; +static CWCB sslWriteServerDone; static PSC sslPeerSelectComplete; static void sslStateFree(SslStateData * sslState); static void sslConnected(int fd, void *); static void sslProxyConnected(int fd, void *); -static void sslSetSelect(SslStateData * sslState); #if DELAY_POOLS static DEFER sslDeferServerRead; #endif @@ -129,81 +128,24 @@ sslDeferServerRead(int fdnotused, void *data) } #endif -static void -sslSetSelect(SslStateData * sslState) -{ - size_t read_sz = SQUID_TCP_SO_RCVBUF; - assert(sslState->server.fd > -1 || sslState->client.fd > -1); - if (sslState->client.fd > -1) { - if (sslState->server.len > 0) { - commSetSelect(sslState->client.fd, - COMM_SELECT_WRITE, - sslWriteClient, - sslState, - 0); - } - if ((size_t)sslState->client.len < read_sz) { - commSetSelect(sslState->client.fd, - COMM_SELECT_READ, - sslReadClient, - sslState, - Config.Timeout.read); - } - } else if (sslState->client.len == 0) { - comm_close(sslState->server.fd); - } - if (sslState->server.fd > -1) { - if (sslState->client.len > 0) { - commSetSelect(sslState->server.fd, - COMM_SELECT_WRITE, - sslWriteServer, - sslState, - 0); - } -#if DELAY_POOLS - /* - * If this was allowed to return 0, there would be a possibility - * of the socket becoming "hung" with data accumulating but no - * write handler (server.len==0) and no read handler (!(0<0)) and - * no data flowing in the other direction. Hence the argument of - * 1 as min. - */ - read_sz = delayBytesWanted(sslState->delayId, 1, read_sz); -#endif - if ((size_t)sslState->server.len < read_sz) { - /* Have room to read more */ - commSetSelect(sslState->server.fd, - COMM_SELECT_READ, - sslReadServer, - sslState, - Config.Timeout.read); - } - } else if (sslState->client.fd == -1) { - /* client already closed, nothing more to do */ - } else if (sslState->server.len == 0) { - comm_close(sslState->client.fd); - } -} /* Read from server side and queue it for writing to the client */ static void -sslReadServer(int fd, void *data) +sslReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; - size_t read_sz = SQUID_TCP_SO_RCVBUF - sslState->server.len; + assert(fd == sslState->server.fd); - debug(26, 3) ("sslReadServer: FD %d, reading %d bytes at offset %d\n", - fd, (int) read_sz, sslState->server.len); errno = 0; #if DELAY_POOLS read_sz = delayBytesWanted(sslState->delayId, 1, read_sz); #endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, sslState->server.buf + sslState->server.len, read_sz); - debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, len); + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (errcode == COMM_ERR_CLOSING) { + return; + } + debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, (int)len); if (len > 0) { - fd_bytes(fd, len, FD_READ); #if DELAY_POOLS delayBytesIn(sslState->delayId, len); #endif @@ -219,29 +161,29 @@ sslReadServer(int fd, void *data) comm_close(fd); } else if (len == 0) { comm_close(sslState->server.fd); - } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + /* Only close the remote end if we've finished queueing data to it */ + if (sslState->server.len == 0 && sslState->client.fd != -1) { + comm_close(sslState->client.fd); + } + } else if (cbdataReferenceValid(sslState)) + comm_write(sslState->client.fd, sslState->server.buf, len, sslWriteClientDone, sslState, NULL); cbdataInternalUnlock(sslState); /* ??? */ } /* Read from client side and queue it for writing to the server */ static void -sslReadClient(int fd, void *data) +sslReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; assert(fd == sslState->client.fd); - debug(26, 3) ("sslReadClient: FD %d, reading %d bytes at offset %d\n", - fd, SQUID_TCP_SO_RCVBUF - sslState->client.len, - sslState->client.len); - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, - sslState->client.buf + sslState->client.len, - SQUID_TCP_SO_RCVBUF - sslState->client.len); - debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, len); + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (errcode == COMM_ERR_CLOSING) { + return; + } + + debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, (int) len); if (len > 0) { - fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.client_http.kbytes_in, len); sslState->client.len += len; } @@ -249,100 +191,106 @@ sslReadClient(int fd, void *data) if (len < 0) { int level = 1; #ifdef ECONNRESET - if (errno == ECONNRESET) + if (xerrno == ECONNRESET) level = 2; #endif - if (ignoreErrno(errno)) + if (ignoreErrno(xerrno)) level = 3; + /* XXX xstrerror() should be changed to take errno as an arg! */ + errno = xerrno; debug(50, level) ("sslReadClient: FD %d: read failure: %s\n", fd, xstrerror()); - if (!ignoreErrno(errno)) + if (!ignoreErrno(xerrno)) comm_close(fd); } else if (len == 0) { - comm_close(fd); - } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + comm_close(sslState->client.fd); + /* Only close the remote end if we've finished queueing data to it */ + if (sslState->client.len == 0 && sslState->server.fd != -1) { + comm_close(sslState->server.fd); + } + } else if (cbdataReferenceValid(sslState)) + comm_write(sslState->server.fd, sslState->client.buf, len, sslWriteServerDone, sslState, NULL); cbdataInternalUnlock(sslState); /* ??? */ } /* Writes data from the client buffer to the server side */ static void -sslWriteServer(int fd, void *data) +sslWriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; assert(fd == sslState->server.fd); - debug(26, 3) ("sslWriteServer: FD %d, %d bytes to write\n", - fd, sslState->client.len); - statCounter.syscalls.sock.writes++; - len = FD_WRITE_METHOD(fd, - sslState->client.buf, - sslState->client.len); - debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, len); + debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, (int)len); + /* Valid data */ if (len > 0) { - fd_bytes(fd, len, FD_WRITE); kb_incr(&statCounter.server.all.kbytes_out, len); kb_incr(&statCounter.server.other.kbytes_out, len); - assert(len <= sslState->client.len); - sslState->client.len -= len; - if (sslState->client.len > 0) { - /* we didn't write the whole thing */ - xmemmove(sslState->client.buf, - sslState->client.buf + len, - sslState->client.len); - } + assert(len == (size_t)sslState->client.len); + sslState->client.len = 0; + } + /* EOF */ + if (len == 0) { + comm_close(sslState->server.fd); + return; + } + /* If the other end has closed, so should we */ + if (sslState->client.fd == -1) { + comm_close(sslState->server.fd); + return; } cbdataInternalLock(sslState); /* ??? should be locked by the caller... */ + /* Error? */ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("sslWriteServer: FD %d: write failure: %s.\n", fd, xstrerror()); if (!ignoreErrno(errno)) comm_close(fd); } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + if (cbdataReferenceValid(sslState)) { + assert(sslState->client.len == 0); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } cbdataInternalUnlock(sslState); /* ??? */ } /* Writes data from the server buffer to the client side */ static void -sslWriteClient(int fd, void *data) +sslWriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, void *data) { SslStateData *sslState = (SslStateData *)data; - int len; assert(fd == sslState->client.fd); - debug(26, 3) ("sslWriteClient: FD %d, %d bytes to write\n", - fd, sslState->server.len); - statCounter.syscalls.sock.writes++; - len = FD_WRITE_METHOD(fd, - sslState->server.buf, - sslState->server.len); - debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, len); + debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, (int)len); if (len > 0) { - fd_bytes(fd, len, FD_WRITE); kb_incr(&statCounter.client_http.kbytes_out, len); - assert(len <= sslState->server.len); - sslState->server.len -= len; + assert(len == (size_t)sslState->server.len); + sslState->server.len =0; /* increment total object size */ if (sslState->size_ptr) *sslState->size_ptr += len; - if (sslState->server.len > 0) { - /* we didn't write the whole thing */ - xmemmove(sslState->server.buf, - sslState->server.buf + len, - sslState->server.len); - } + } + /* EOF */ + if (len == 0) { + comm_close(sslState->client.fd); + return; + } + /* If the other end has closed, so should we */ + if (sslState->server.fd == -1) { + comm_close(sslState->client.fd); + return; } cbdataInternalLock(sslState); /* ??? should be locked by the caller... */ + /* Error? */ if (len < 0) { debug(50, ignoreErrno(errno) ? 3 : 1) ("sslWriteClient: FD %d: write failure: %s.\n", fd, xstrerror()); if (!ignoreErrno(errno)) comm_close(fd); } - if (cbdataReferenceValid(sslState)) - sslSetSelect(sslState); + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + } cbdataInternalUnlock(sslState); /* ??? */ } @@ -357,15 +305,52 @@ sslTimeout(int fd, void *data) comm_close(sslState->server.fd); } +static void +sslConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data) +{ + SslStateData *sslState = (SslStateData *)data; + if (flag != COMM_OK) { + sslErrorComplete(fd, data, 0); + return; + } + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } +} + + +/* + * handle the write completion from a proxy request to an upstream proxy + */ +static void +sslProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data) +{ + SslStateData *sslState = (SslStateData *)data; + if (flag != COMM_OK) { + sslErrorComplete(fd, data, 0); + return; + } + if (cbdataReferenceValid(sslState)) { + assert(sslState->server.len == 0); + comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF, + sslReadServer, sslState); + comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF, + sslReadClient, sslState); + } +} + static void sslConnected(int fd, void *data) { SslStateData *sslState = (SslStateData *)data; debug(26, 3) ("sslConnected: FD %d sslState=%p\n", fd, sslState); *sslState->status_ptr = HTTP_OK; - xstrncpy(sslState->server.buf, conn_established, SQUID_TCP_SO_RCVBUF); - sslState->server.len = strlen(conn_established); - sslSetSelect(sslState); + comm_write(sslState->client.fd, conn_established, strlen(conn_established), + sslConnectedWriteDone, sslState, NULL); } static void @@ -417,8 +402,9 @@ sslConnectDone(int fdnotused, comm_err_t status, void *data) } else { if (sslState->servers->_peer) sslProxyConnected(sslState->server.fd, sslState); - else + else { sslConnected(sslState->server.fd, sslState); + } commSetTimeout(sslState->server.fd, Config.Timeout.read, sslTimeout, @@ -549,15 +535,13 @@ sslProxyConnected(int fd, void *data) httpHeaderClean(&hdr_out); packerClean(&p); memBufAppend(&mb, "\r\n", 2); - xstrncpy(sslState->client.buf, mb.buf, SQUID_TCP_SO_RCVBUF); - debug(26, 3) ("sslProxyConnected: Sending {%s}\n", sslState->client.buf); - sslState->client.len = mb.size; - memBufClean(&mb); + + comm_write_mbuf(sslState->server.fd, mb, sslProxyConnectedWriteDone, sslState); + commSetTimeout(sslState->server.fd, Config.Timeout.read, sslTimeout, sslState); - sslSetSelect(sslState); } static void diff --git a/src/typedefs.h b/src/typedefs.h index b2b4d8b836..d488cca067 100644 --- a/src/typedefs.h +++ b/src/typedefs.h @@ -1,6 +1,6 @@ /* - * $Id: typedefs.h,v 1.139 2002/10/13 20:35:06 robertc Exp $ + * $Id: typedefs.h,v 1.140 2002/10/14 08:16:59 robertc Exp $ * * * SQUID Web Proxy Cache http://www.squid-cache.org/ @@ -198,6 +198,9 @@ typedef struct _delaySpec delaySpec; typedef void CWCB(int fd, char *, size_t size, comm_err_t flag, void *data); typedef void CNCB(int fd, comm_err_t status, void *); +typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data); +typedef void IOACB(int fd, int nfd, struct sockaddr_in *me, struct sockaddr_in + *pn, comm_err_t flag, int xerrno, void *data); typedef void FREE(void *); typedef void CBDUNL(void *); diff --git a/src/urn.cc b/src/urn.cc index d715a7d9ae..e33d4fb0e3 100644 --- a/src/urn.cc +++ b/src/urn.cc @@ -1,6 +1,6 @@ /* - * $Id: urn.cc,v 1.76 2002/10/13 20:35:06 robertc Exp $ + * $Id: urn.cc,v 1.77 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 52 URN Parsing * AUTHOR: Kostas Anagnostakis @@ -279,7 +279,7 @@ urnHandleReply(void *data, StoreIOBuffer result) char *buf = urnState->reqbuf; StoreIOBuffer tempBuffer; - debug(52, 3) ("urnHandleReply: Called with size=%u.\n", result.length); + debug(52, 3) ("urnHandleReply: Called with size=%u.\n", (unsigned int)result.length); if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED)) { goto error; } diff --git a/src/wais.cc b/src/wais.cc index 9d70edd6fb..1791253c81 100644 --- a/src/wais.cc +++ b/src/wais.cc @@ -1,6 +1,6 @@ /* - * $Id: wais.cc,v 1.141 2002/10/14 07:35:46 hno Exp $ + * $Id: wais.cc,v 1.142 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 24 WAIS Relay * AUTHOR: Harvest Derived @@ -44,11 +44,12 @@ typedef struct { char url[MAX_URL]; request_t *request; FwdState *fwd; + char buf[BUFSIZ]; } WaisStateData; static PF waisStateFree; static PF waisTimeout; -static PF waisReadReply; +static IOCB waisReadReply; static CWCB waisSendComplete; static PF waisSendRequest; @@ -82,53 +83,53 @@ waisTimeout(int fd, void *data) /* This will be called when data is ready to be read from fd. Read until * error or connection closed. */ static void -waisReadReply(int fd, void *data) +waisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { WaisStateData *waisState = (WaisStateData *)data; - LOCAL_ARRAY(char, buf, 4096); StoreEntry *entry = waisState->entry; - int len; int clen; int bin; size_t read_sz; #if DELAY_POOLS delay_id delayId = delayMostBytesAllowed(entry->mem_obj); #endif + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { comm_close(fd); return; } errno = 0; - read_sz = 4096; -#if DELAY_POOLS - read_sz = delayBytesWanted(delayId, 1, read_sz); -#endif - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, read_sz); - if (len > 0) { - fd_bytes(fd, len, FD_READ); + read_sz = BUFSIZ; + if (flag == COMM_OK && len > 0) { #if DELAY_POOLS delayBytesIn(delayId, len); #endif kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.other.kbytes_in, len); } - debug(24, 5) ("waisReadReply: FD %d read len:%d\n", fd, len); - if (len > 0) { +#if DELAY_POOLS + read_sz = delayBytesWanted(delay_id, 1, read_sz); +#endif + debug(24, 5) ("waisReadReply: FD %d read len:%d\n", fd, (int)len); + if (flag == COMM_OK && len > 0) { commSetTimeout(fd, Config.Timeout.read, NULL, NULL); IOStats.Wais.reads++; for (clen = len - 1, bin = 0; clen; bin++) clen >>= 1; IOStats.Wais.read_hist[bin]++; } - if (len < 0) { + if (flag != COMM_OK || len < 0) { debug(50, 1) ("waisReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); - if (ignoreErrno(errno)) { + if (ignoreErrno(xerrno)) { /* reinstall handlers */ /* XXX This may loop forever */ - commSetSelect(fd, COMM_SELECT_READ, - waisReadReply, waisState, 0); + comm_read(fd, waisState->buf, read_sz, waisReadReply, waisState); } else { ErrorState *err; EBIT_CLR(entry->flags, ENTRY_CACHABLE); @@ -139,24 +140,21 @@ waisReadReply(int fd, void *data) errorAppendEntry(entry, err); comm_close(fd); } - } else if (len == 0 && entry->mem_obj->inmem_hi == 0) { + } else if (flag == COMM_OK && len == 0 && entry->mem_obj->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE); err->xerrno = errno; err->request = requestLink(waisState->request); errorAppendEntry(entry, err); comm_close(fd); - } else if (len == 0) { + } else if (flag == COMM_OK && len == 0) { /* Connection closed; retrieval done. */ entry->expires = squid_curtime; fwdComplete(waisState->fwd); comm_close(fd); } else { storeAppend(entry, buf, len); - commSetSelect(fd, - COMM_SELECT_READ, - waisReadReply, - waisState, 0); + comm_read(fd, waisState->buf, read_sz, waisReadReply, waisState); } } @@ -185,10 +183,7 @@ waisSendComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, void comm_close(fd); } else { /* Schedule read reply. */ - commSetSelect(fd, - COMM_SELECT_READ, - waisReadReply, - waisState, 0); + comm_read(fd, waisState->buf, BUFSIZ, waisReadReply, waisState); commSetDefer(fd, fwdCheckDeferRead, entry); } } @@ -241,6 +236,6 @@ waisStart(FwdState * fwd) waisState->fwd = fwd; comm_add_close_handler(waisState->fd, waisStateFree, waisState); storeLockObject(entry); - commSetSelect(fd, COMM_SELECT_WRITE, waisSendRequest, waisState, 0); commSetTimeout(fd, Config.Timeout.read, waisTimeout, waisState); + waisSendRequest(fd, waisState); } diff --git a/src/whois.cc b/src/whois.cc index abd6d29d98..52ea765775 100644 --- a/src/whois.cc +++ b/src/whois.cc @@ -1,6 +1,6 @@ /* - * $Id: whois.cc,v 1.19 2002/10/13 20:35:06 robertc Exp $ + * $Id: whois.cc,v 1.20 2002/10/14 08:16:59 robertc Exp $ * * DEBUG: section 75 WHOIS protocol * AUTHOR: Duane Wessels, Kostas Anagnostakis @@ -42,11 +42,12 @@ typedef struct { StoreEntry *entry; request_t *request; FwdState *fwd; + char buf[BUFSIZ]; } WhoisState; static PF whoisClose; static PF whoisTimeout; -static PF whoisReadReply; +static IOCB whoisReadReply; /* PUBLIC */ @@ -70,7 +71,7 @@ whoisStart(FwdState * fwd) buf = (char *)xmalloc(l); snprintf(buf, l, "%s\r\n", strBuf(p->request->urlpath) + 1); comm_write(fd, buf, strlen(buf), NULL, p, xfree); - commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, 0); + comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p); commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p); } @@ -85,39 +86,43 @@ whoisTimeout(int fd, void *data) } static void -whoisReadReply(int fd, void *data) +whoisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { WhoisState *p = (WhoisState *)data; StoreEntry *entry = p->entry; - char *buf = (char *)memAllocate(MEM_4K_BUF); MemObject *mem = entry->mem_obj; - int len; - statCounter.syscalls.sock.reads++; - len = FD_READ_METHOD(fd, buf, 4095); + int do_next_read = 0; + + /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */ + if (flag == COMM_ERR_CLOSING) { + return; + } + buf[len] = '\0'; - debug(75, 3) ("whoisReadReply: FD %d read %d bytes\n", fd, len); + debug(75, 3) ("whoisReadReply: FD %d read %d bytes\n", fd, (int)len); debug(75, 5) ("{%s}\n", buf); - if (len > 0) { + if (flag == COMM_OK && len > 0) { if (0 == mem->inmem_hi) mem->reply->sline.status = HTTP_OK; - fd_bytes(fd, len, FD_READ); kb_incr(&statCounter.server.all.kbytes_in, len); kb_incr(&statCounter.server.http.kbytes_in, len); storeAppend(entry, buf, len); - commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, Config.Timeout.read); - } else if (len < 0) { + do_next_read = 1; + } else if (flag != COMM_OK || len < 0) { debug(50, 2) ("whoisReadReply: FD %d: read failure: %s.\n", fd, xstrerror()); if (ignoreErrno(errno)) { - commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, Config.Timeout.read); + do_next_read = 1; } else if (mem->inmem_hi == 0) { ErrorState *err; err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR); err->xerrno = errno; fwdFail(p->fwd, err); comm_close(fd); + do_next_read = 0; } else { comm_close(fd); + do_next_read = 0; } } else { storeTimestampsSet(entry); @@ -127,8 +132,10 @@ whoisReadReply(int fd, void *data) fwdComplete(p->fwd); debug(75, 3) ("whoisReadReply: Done: %s\n", storeUrl(entry)); comm_close(fd); + do_next_read = 0; } - memFree(buf, MEM_4K_BUF); + if (do_next_read) + comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p); } static void