#
# Makefile for the Squid Object Cache server
#
-# $Id: Makefile.am,v 1.36 2002/10/13 20:34:57 robertc Exp $
+# $Id: Makefile.am,v 1.37 2002/10/14 08:16:58 robertc Exp $
#
# Uncomment and customize the following to suit your needs:
#
clientStream.cc \
clientStream.h \
comm.cc \
+ comm.h \
comm_select.cc \
comm_poll.cc \
comm_kqueue.cc \
#
# Makefile for the Squid Object Cache server
#
-# $Id: Makefile.in,v 1.251 2002/10/13 20:34:57 robertc Exp $
+# $Id: Makefile.in,v 1.252 2002/10/14 08:16:58 robertc Exp $
#
# Uncomment and customize the following to suit your needs:
#
clientStream.cc \
clientStream.h \
comm.cc \
+ comm.h \
comm_select.cc \
comm_poll.cc \
comm_kqueue.cc \
/*
- * $Id: asn.cc,v 1.82 2002/10/13 20:34:57 robertc Exp $
+ * $Id: asn.cc,v 1.83 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 53 AS Number handling
* AUTHOR: Duane Wessels, Kostas Anagnostakis
char *buf = asState->reqbuf;
int leftoversz = -1;
- debug(53, 3) ("asHandleReply: Called with size=%u\n", result.length);
+ debug(53, 3) ("asHandleReply: Called with size=%u\n", (unsigned int)result.length);
debug(53, 3) ("asHandleReply: buffer='%s'\n", buf);
/* First figure out whether we should abort the request */
asStateFree(asState);
return;
} else if (result.flags.error) {
- debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", result.length);
+ debug(53, 1) ("asHandleReply: Called with Error set and size=%u\n", (unsigned int) result.length);
asStateFree(asState);
return;
} else if (HTTP_OK != e->mem_obj->reply->sline.status) {
/*
- * $Id: client_side.cc,v 1.599 2002/10/13 20:34:59 robertc Exp $
+ * $Id: client_side.cc,v 1.600 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 33 Client-side Routines
* AUTHOR: Duane Wessels
#include "IPInterception.h"
#include "authenticate.h"
#include "Store.h"
+#include "comm.h"
#if LINGERING_CLOSE
struct {
int deferred:1; /* This is a pipelined request waiting for the
* current object to complete */
+ int parsed_ok:1; /* Was this parsed correctly? */
+ int mayUseConnection:1; /* This request may use the connection -
+ * don't read anymore requests for now
+ */
} flags;
struct {
clientStreamNode *node;
/* other */
static CWCB clientWriteComplete;
static CWCB clientWriteBodyComplete;
-static PF clientReadRequest;
+static IOCB clientReadRequest;
static PF connStateFree;
static PF requestTimeout;
static PF clientLifetimeTimeout;
static void checkFailureRatio(err_type, hier_code);
static clientSocketContext *parseHttpRequestAbort(ConnStateData * conn,
const char *uri);
-static clientSocketContext *parseHttpRequest(ConnStateData *, method_t *, int *,
+static clientSocketContext *parseHttpRequest(ConnStateData *, method_t *,
char **, size_t *);
#if USE_IDENT
static IDCB clientIdentDone;
static void clientPrepareLogWithRequestDetails(request_t *, AccessLogEntry *);
static void clientLogRequest(clientHttpRequest *);
static void httpRequestFreeResources(clientHttpRequest *);
-static void connEmptyOSReadBuffers(int fd);
static int connIsUsable(ConnStateData * conn);
static clientSocketContext *connGetCurrentContext(ConnStateData * conn);
static void contextDeferRecipientForLater(clientSocketContext * context, clientStreamNode * node, HttpReply * rep, StoreIOBuffer recievedData);
static char *findTrailingHTTPVersion(char *uriAndHTTPVersion);
static void trimTrailingSpaces(char *aString, size_t len);
static clientSocketContext *parseURIandHTTPVersion(char **url_p, http_version_t * http_ver_p, ConnStateData * conn);
-static void setLogUri(clientHttpRequest * http, char *uri);
+static void setLogUri(clientHttpRequest * http, char const *uri);
static void prepareInternalUrl(clientHttpRequest * http, char *url);
static void prepareForwardProxyUrl(clientHttpRequest * http, char *url);
static void prepareAcceleratedUrl(clientHttpRequest * http, char *url, char *req_hdr);
static void connMakeSpaceAvailable(ConnStateData * conn);
static void connAddContextToQueue(ConnStateData * conn, clientSocketContext * context);
static int connGetConcurrentRequestCount(ConnStateData * conn);
-static int connReadWasError(ConnStateData * conn, int size);
+static int connReadWasError(ConnStateData * conn, comm_err_t, int size);
static int connFinishedWithConn(ConnStateData * conn, int size);
static void connNoteUseOfBuffer(ConnStateData * conn, size_t byteCount);
static int connKeepReadingIncompleteRequest(ConnStateData * conn);
static void connCancelIncompleteRequests(ConnStateData * conn);
-static ConnStateData *connStateCreate(struct sockaddr_in peer, struct sockaddr_in me, int fd);
+static ConnStateData *connStateCreate(struct sockaddr_in *peer, struct sockaddr_in *me, int fd);
static clientStreamNode *getClientReplyContext(clientSocketContext * context);
static int connAreAllContextsForThisConnection(ConnStateData * connState);
static void connFreeAllContexts(ConnStateData * connState);
return (clientStreamNode *)context->http->client_stream.tail->prev->data;
}
+/*
+ * This routine should be called to grow the inbuf and then
+ * call comm_read().
+ */
+void
+clientReadSomeData(ConnStateData *conn)
+{
+ size_t len;
+
+ debug(33, 4) ("clientReadSomeData: FD %d: reading request...\n", conn->fd);
+
+ connMakeSpaceAvailable(conn);
+ len = connGetAvailableBufferLength(conn) - 1;
+
+ comm_read(conn->fd, conn->in.buf + conn->in.notYetUsed, len, clientReadRequest, conn);
+}
+
+
void
clientSocketRemoveThisFromConnectionList(clientSocketContext * context,
ConnStateData * conn)
}
}
-void
-connEmptyOSReadBuffers(int fd)
-{
-#ifdef _SQUID_LINUX_
- /* prevent those nasty RST packets */
- char buf[SQUID_TCP_SO_RCVBUF];
- while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0);
-#endif
-}
-
/* This is a handler normally called by comm_close() */
static void
connStateFree(int fd, void *data)
memFreeBuf(connState->in.allocatedSize, connState->in.buf);
pconnHistCount(0, connState->nrequests);
cbdataFree(connState);
- connEmptyOSReadBuffers(fd);
}
/*
*/
commSetTimeout(conn->fd, Config.Timeout.persistent_request,
requestTimeout, conn);
- /*
- * CYGWIN has a problem and is blocking on read() requests when there
- * is no data present.
- * This hack may hit performance a little, but it's better than
- * blocking!.
- */
conn->defer.until = 0; /* Kick it to read a new request */
-#ifdef _SQUID_CYGWIN_
- commSetSelect(conn->fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
-#else
- clientReadRequest(conn->fd, conn); /* Read next request */
-#endif
- /*
- * Note, the FD may be closed at this point.
- */
+ /* Make sure we're still reading from the client side! */
+ /* XXX this could take a bit of CPU time! aiee! -- adrian */
+ assert(comm_has_pending_read(conn->fd));
+ /* clientReadSomeData(conn); */
+ /* Please don't do anything with the FD past here! */
}
void
http->start = current_time;
http->req_sz = conn->in.notYetUsed;
http->uri = xstrdup(uri);
- http->log_uri = xstrndup(uri, MAX_URL);
+ setLogUri (http, uri);
context = clientSocketContextNew(http);
tempBuffer.data = context->reqbuf;
tempBuffer.length = HTTP_REQBUF_SZ;
}
void
-setLogUri(clientHttpRequest * http, char *uri)
+setLogUri(clientHttpRequest * http, char const *uri)
{
if (!stringHasCntl(uri))
http->log_uri = xstrndup(uri, MAX_URL);
* parseHttpRequest()
*
* Returns
- * NULL on error or incomplete request
- * a clientHttpRequest structure on success
+ * NULL on incomplete requests
+ * a clientSocketContext structure on success or failure.
+ * Sets result->flags.parsed_ok to 0 if failed to parse the request.
+ * Sets result->flags.parsed_ok to 1 if we have a good request.
*/
static clientSocketContext *
-parseHttpRequest(ConnStateData * conn, method_t * method_p, int *status,
+parseHttpRequest(ConnStateData * conn, method_t * method_p,
char **prefix_p, size_t * req_line_sz_p)
{
char *inbuf = NULL;
/* pre-set these values to make aborting simpler */
*prefix_p = NULL;
*method_p = METHOD_NONE;
- *status = -1;
if ((req_sz = headersEnd(conn->in.buf, conn->in.notYetUsed)) == 0) {
debug(33, 5) ("Incomplete request, waiting for end of headers\n");
- *status = 0;
return NULL;
}
assert(req_sz <= conn->in.notYetUsed);
header_sz = req_sz - (req_hdr - inbuf);
if (0 == header_sz) {
debug(33, 3) ("parseHttpRequest: header_sz == 0\n");
- *status = 0;
xfree(inbuf);
return NULL;
}
setLogUri(http, http->uri);
debug(33, 5) ("parseHttpRequest: Complete request received\n");
xfree(inbuf);
- *status = 1;
+ result->flags.parsed_ok = 1;
return result;
}
}
int
-connReadWasError(ConnStateData * conn, int size)
+connReadWasError(ConnStateData * conn, comm_err_t flag, int size)
{
+ if (flag != COMM_OK) {
+ debug(50, 2) ("connReadWasError: FD %d: got flag %d\n", conn->fd, flag);
+ return 1;
+ }
if (size < 0) {
if (!ignoreErrno(errno)) {
debug(50, 2) ("connReadWasError: FD %d: %s\n", conn->fd, xstrerror());
}
static void
-clientReadRequest(int fd, void *data)
+clientMaybeReadData(ConnStateData *conn, int do_next_read)
{
- ConnStateData *conn = (ConnStateData *)data;
- int parser_return_code = 0;
+ if (do_next_read) {
+ conn->flags.readMoreRequests = 1;
+ clientReadSomeData(conn);
+ }
+}
+
+static void
+clientAfterReadingRequests(int fd, ConnStateData *conn, int do_next_read)
+{
+ fde *F = &fd_table[fd];
+
+ /* Check if a half-closed connection was aborted in the middle */
+ if (F->flags.socket_eof) {
+ if (conn->in.notYetUsed != conn->body.size_left) { /* != 0 when no
+request body */
+ /* Partial request received. Abort client connection! */
+ debug(33, 3) ("clientReadRequest: FD %d aborted, partial request\n",+ fd);
+ comm_close(fd);
+ return;
+ }
+ }
+
+ clientMaybeReadData (conn, do_next_read);
+}
+
+
+static void
+clientProcessRequest(ConnStateData *conn, clientSocketContext *context, method_t method, char *prefix, size_t req_line_sz)
+{
+ clientHttpRequest *http = context->http;
request_t *request = NULL;
- int size;
+ /* We have an initial client stream in place should it be needed */
+ /* setup our private context */
+ connNoteUseOfBuffer(conn, http->req_sz);
+
+ connAddContextToQueue(conn, context);
+
+ if (context->flags.parsed_ok == 0) {
+ clientStreamNode *node = getClientReplyContext(context);
+ debug(33, 1) ("clientReadRequest: Invalid Request\n");
+ clientSetReplyToError(node->data,
+ ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL,
+ &conn->peer.sin_addr, NULL, conn->in.buf, NULL);
+ assert(context->http->out.offset == 0);
+ clientPullData(context);
+ conn->flags.readMoreRequests = 0;
+ return;
+ }
+
+ if ((request = urlParse(method, http->uri)) == NULL) {
+ clientStreamNode *node = getClientReplyContext(context);
+ debug(33, 5) ("Invalid URL: %s\n", http->uri);
+ clientSetReplyToError(node->data,
+ ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri,
+ &conn->peer.sin_addr, NULL, NULL, NULL);
+ assert(context->http->out.offset == 0);
+ clientPullData(context);
+ conn->flags.readMoreRequests = 0;
+ return;
+ } else {
+
+ /* compile headers */
+ /* we should skip request line! */
+ if (!httpRequestParseHeader(request, prefix + req_line_sz))
+ debug(33, 1) ("Failed to parse request headers: %s\n%s\n",
+ http->uri, prefix);
+ /* continue anyway? */
+ }
+
+ request->flags.accelerated = http->flags.accel;
+ if (!http->flags.internal) {
+ if (internalCheck(strBuf(request->urlpath))) {
+ if (internalHostnameIs(request->host) &&
+ request->port == getMyPort()) {
+ http->flags.internal = 1;
+ } else if (internalStaticCheck(strBuf(request->urlpath))) {
+ xstrncpy(request->host, internalHostname(),
+ SQUIDHOSTNAMELEN);
+ request->port = getMyPort();
+ http->flags.internal = 1;
+ }
+ }
+ }
+
+ /*
+ * cache the Content-length value in request_t.
+ */
+ request->content_length = httpHeaderGetInt(&request->header,
+ HDR_CONTENT_LENGTH);
+ request->flags.internal = http->flags.internal;
+ setLogUri (http, urlCanonicalClean(request));
+ request->client_addr = conn->peer.sin_addr;
+ request->my_addr = conn->me.sin_addr;
+ request->my_port = ntohs(conn->me.sin_port);
+ request->http_ver = http->http_ver;
+ if (!urlCheckRequest(request) ||
+ httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) {
+ clientStreamNode *node = getClientReplyContext(context);
+ clientSetReplyToError(node->data, ERR_UNSUP_REQ,
+ HTTP_NOT_IMPLEMENTED, request->method, NULL,
+ &conn->peer.sin_addr, request, NULL, NULL);
+ assert(context->http->out.offset == 0);
+ clientPullData(context);
+ conn->flags.readMoreRequests = 0;
+ return;
+ }
+
+
+ if (!clientIsContentLengthValid(request)) {
+ clientStreamNode *node = getClientReplyContext(context);
+ clientSetReplyToError(node->data, ERR_INVALID_REQ,
+ HTTP_LENGTH_REQUIRED, request->method, NULL,
+ &conn->peer.sin_addr, request, NULL, NULL);
+ assert(context->http->out.offset == 0);
+ clientPullData(context);
+ conn->flags.readMoreRequests = 0;
+ return;
+ }
+
+ http->request = requestLink(request);
+ clientSetKeepaliveFlag(http);
+ /* Do we expect a request-body? */
+ if (request->content_length > 0) {
+ conn->body.size_left = request->content_length;
+ request->body_connection = conn;
+ /* Is it too large? */
+ if (!clientIsRequestBodyValid(request->content_length) ||
+ clientIsRequestBodyTooLargeForPolicy(request->content_length)) {
+ clientStreamNode *node = getClientReplyContext(context);
+ clientSetReplyToError(node->data, ERR_TOO_BIG,
+ HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL,
+ &conn->peer.sin_addr, http->request, NULL, NULL);
+ assert(context->http->out.offset == 0);
+ clientPullData(context);
+ conn->flags.readMoreRequests = 0;
+ return;
+ }
+ }
+
+ /* If this is a CONNECT, don't schedule a read - ssl.c will handle it */
+ if (http->request->method == METHOD_CONNECT)
+ context->flags.mayUseConnection = 1;
+ clientAccessCheck(http);
+}
+
+static void
+connStripBufferWhitespace (ConnStateData *conn)
+{
+ while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) {
+ xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1);
+ --conn->in.notYetUsed;
+ }
+}
+
+static int
+connOkToAddRequest(ConnStateData *conn)
+{
+ int result = connGetConcurrentRequestCount(conn) < (Config.onoff.pipeline_prefetch ? 2 : 1);
+ if (!result) {
+ debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n",
+ conn->fd);
+ debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n",
+ conn->fd);
+ }
+ return result;
+}
+
+static void
+connSetDefer (ConnStateData *conn, size_t milliSeconds)
+{
+ conn->defer.until = squid_curtime + milliSeconds;
+ conn->defer.n++;
+}
+
+static void
+clientReadRequest(int fd, char *buf, size_t size, comm_err_t flag, int xerrno,
+void *data)
+{
+ ConnStateData *conn = (ConnStateData *)data;
method_t method;
char *prefix = NULL;
- fde *F = &fd_table[fd];
- int len;
clientSocketContext *context;
- debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd);
- connMakeSpaceAvailable(conn);
- len = connGetAvailableBufferLength(conn) - 1;
- commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
- statCounter.syscalls.sock.reads++;
- /* TODO: read should callback */
- size = FD_READ_METHOD(fd, conn->in.buf + conn->in.notYetUsed, len);
+ int do_next_read = 1; /* the default _is_ to read data! - adrian */
+
+ assert (fd == conn->fd);
+
+ /* Bail out quickly on COMM_ERR_CLOSING - close handlers will tidy up */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
/*
* Don't reset the timeout value here. The timeout value will be
* set to Config.Timeout.request by httpAccept() and
* whole, not individual read() calls. Plus, it breaks our
* lame half-close detection
*/
- if (size > 0) {
- fd_bytes(fd, size, FD_READ);
- kb_incr(&statCounter.client_http.kbytes_in, size);
- conn->in.notYetUsed += size;
- conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */
- } else if (size == 0) {
- if (connFinishedWithConn(conn, size)) {
- comm_close(fd);
- return;
- }
- /* It might be half-closed, we can't tell */
- debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd);
- F->flags.socket_eof = 1;
- conn->defer.until = squid_curtime + 1;
- conn->defer.n++;
- fd_note(fd, "half-closed");
- /* There is one more close check at the end, to detect aborted
- * (partial) requests. At this point we can't tell if the request
- * is partial.
- */
- /* Continue to process previously read data */
- } else if (connReadWasError(conn, size)) {
+ if (connReadWasError(conn, flag, size)) {
comm_close(fd);
return;
}
+
+ if (flag == COMM_OK) {
+ if (size > 0) {
+ kb_incr(&statCounter.client_http.kbytes_in, size);
+ conn->in.notYetUsed += size;
+ conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string
+*/
+ } else if (size == 0) {
+ debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd);
+ if (connFinishedWithConn(conn, size)) {
+ comm_close(fd);
+ return;
+ }
+ /* It might be half-closed, we can't tell */
+ fd_table[fd].flags.socket_eof = 1;
+ connSetDefer (conn, 1);
+ fd_note(fd, "half-closed");
+ /* There is one more close check at the end, to detect aborted
+ * (partial) requests. At this point we can't tell if the request
+ * is partial.
+ */
+ /* Continue to process previously read data */
+ }
+ }
+
+
/* Process request body if any */
if (conn->in.notYetUsed > 0 && conn->body.callback != NULL)
clientProcessBody(conn);
while (conn->in.notYetUsed > 0 && conn->body.size_left == 0) {
size_t req_line_sz;
- /* Skip leading ( or trail from previous request) whitespace */
- while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) {
- xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1);
- --conn->in.notYetUsed;
+ connStripBufferWhitespace (conn);
+ if (conn->in.notYetUsed == 0) {
+ clientAfterReadingRequests(fd, conn, do_next_read);
+ return;
}
- conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */
- if (conn->in.notYetUsed == 0)
- break;
/* Limit the number of concurrent requests to 2 */
- if (connGetConcurrentRequestCount(conn) >= (Config.onoff.pipeline_prefetch ? 2 : 1)) {
- debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n",
- fd);
- debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n",
- fd);
- conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */
- conn->defer.n++;
+ if (!connOkToAddRequest(conn)) {
+ /* Reset when a request is complete */
+ connSetDefer (conn, 100);
+ clientMaybeReadData (conn, do_next_read);
return;
}
- conn->in.buf[conn->in.notYetUsed] = '\0'; /* Terminate the string */
+ /* Should not be needed anymore */
+ /* Terminate the string */
+ conn->in.buf[conn->in.notYetUsed] = '\0';
/* Process request */
context = parseHttpRequest(conn,
- &method, &parser_return_code, &prefix, &req_line_sz);
- if (!context)
+ &method, &prefix, &req_line_sz);
+ /* partial or incomplete request */
+ if (!context) {
safe_free(prefix);
- if (context) {
- clientHttpRequest *http = context->http;
- /* We have an initial client stream in place should it be needed */
- /* setup our private context */
- assert (http->req_sz >= 0);
- connNoteUseOfBuffer(conn, http->req_sz);
+ if (!connKeepReadingIncompleteRequest(conn))
+ connCancelIncompleteRequests(conn);
+ break; /* conn->in.notYetUsed > 0 && conn->body.size_left == 0 */
+ }
- connAddContextToQueue(conn, context);
+ /* status -1 or 1 */
+ if (context) {
commSetTimeout(fd, Config.Timeout.lifetime, clientLifetimeTimeout,
- http);
- if (parser_return_code < 0) {
- clientStreamNode *node = getClientReplyContext(context);
- debug(33, 1) ("clientReadRequest: FD %d Invalid Request\n", fd);
- clientSetReplyToError(node->data,
- ERR_INVALID_REQ, HTTP_BAD_REQUEST, method, NULL,
- &conn->peer.sin_addr, NULL, conn->in.buf, NULL);
- assert(context->http->out.offset == 0);
- clientPullData(context);
- safe_free(prefix);
- break;
- }
- if ((request = urlParse(method, http->uri)) == NULL) {
- clientStreamNode *node = getClientReplyContext(context);
- debug(33, 5) ("Invalid URL: %s\n", http->uri);
- clientSetReplyToError(node->data,
- ERR_INVALID_URL, HTTP_BAD_REQUEST, method, http->uri,
- &conn->peer.sin_addr, NULL, NULL, NULL);
- assert(context->http->out.offset == 0);
- clientPullData(context);
- safe_free(prefix);
- break;
- } else {
- /* compile headers */
- /* we should skip request line! */
- if (!httpRequestParseHeader(request, prefix + req_line_sz))
- debug(33, 1) ("Failed to parse request headers: %s\n%s\n",
- http->uri, prefix);
- /* continue anyway? */
- }
- request->flags.accelerated = http->flags.accel;
- if (!http->flags.internal) {
- if (internalCheck(strBuf(request->urlpath))) {
- if (internalHostnameIs(request->host) &&
- request->port == getMyPort()) {
- http->flags.internal = 1;
- } else if (internalStaticCheck(strBuf(request->urlpath))) {
- xstrncpy(request->host, internalHostname(),
- SQUIDHOSTNAMELEN);
- request->port = getMyPort();
- http->flags.internal = 1;
- }
- }
- }
- /*
- * cache the Content-length value in request_t.
- */
- request->content_length = httpHeaderGetInt(&request->header,
- HDR_CONTENT_LENGTH);
- request->flags.internal = http->flags.internal;
+ context->http);
+
+ clientProcessRequest(conn, context, method, prefix, req_line_sz);
+
safe_free(prefix);
- safe_free(http->log_uri);
- http->log_uri = xstrdup(urlCanonicalClean(request));
- request->client_addr = conn->peer.sin_addr;
- request->my_addr = conn->me.sin_addr;
- request->my_port = ntohs(conn->me.sin_port);
- request->http_ver = http->http_ver;
- if (!urlCheckRequest(request) ||
- httpHeaderHas(&request->header, HDR_TRANSFER_ENCODING)) {
- clientStreamNode *node = getClientReplyContext(context);
- clientSetReplyToError(node->data, ERR_UNSUP_REQ,
- HTTP_NOT_IMPLEMENTED, request->method, NULL,
- &conn->peer.sin_addr, request, NULL, NULL);
- assert(context->http->out.offset == 0);
- clientPullData(context);
- break;
- }
- if (!clientIsContentLengthValid(request)) {
- clientStreamNode *node = getClientReplyContext(context);
- clientSetReplyToError(node->data, ERR_INVALID_REQ,
- HTTP_LENGTH_REQUIRED, request->method, NULL,
- &conn->peer.sin_addr, request, NULL, NULL);
- assert(context->http->out.offset == 0);
- clientPullData(context);
+ if (!conn->flags.readMoreRequests) {
+ conn->flags.readMoreRequests = 1;
break;
}
- http->request = requestLink(request);
- clientSetKeepaliveFlag(http);
- /* Do we expect a request-body? */
- if (request->content_length > 0) {
- conn->body.size_left = request->content_length;
- request->body_connection = conn;
- /* Is it too large? */
- if (!clientIsRequestBodyValid(request->content_length) ||
- clientIsRequestBodyTooLargeForPolicy(request->content_length)) {
- clientStreamNode *node = getClientReplyContext(context);
- clientSetReplyToError(node->data, ERR_TOO_BIG,
- HTTP_REQUEST_ENTITY_TOO_LARGE, METHOD_NONE, NULL,
- &conn->peer.sin_addr, http->request, NULL, NULL);
- assert(context->http->out.offset == 0);
- clientPullData(context);
- break;
- }
- }
- clientAccessCheck(http);
+ if (context->flags.mayUseConnection)
+ do_next_read = 0;
continue; /* while offset > 0 && body.size_left == 0 */
- } else if (parser_return_code == 0) {
- if (!connKeepReadingIncompleteRequest(conn))
- connCancelIncompleteRequests(conn);
- break;
}
} /* while offset > 0 && conn->body.size_left == 0 */
- /* Check if a half-closed connection was aborted in the middle */
- if (F->flags.socket_eof) {
- if (conn->in.notYetUsed != conn->body.size_left) { /* != 0 when no request body */
- /* Partial request received. Abort client connection! */
- debug(33, 3) ("clientReadRequest: FD %d aborted, partial request\n",
- fd);
- comm_close(fd);
- return;
- }
- }
+ clientAfterReadingRequests(fd, conn, do_next_read);
}
/* file_read like function, for reading body content */
}
ConnStateData *
-connStateCreate(struct sockaddr_in peer, struct sockaddr_in me, int fd)
+connStateCreate(struct sockaddr_in *peer, struct sockaddr_in *me, int fd)
{
ConnStateData *result = cbdataAlloc(ConnStateData);
- result->peer = peer;
- result->log_addr = peer.sin_addr;
+ result->peer = *peer;
+ result->log_addr = peer->sin_addr;
result->log_addr.s_addr &= Config.Addrs.client_netmask.s_addr;
- result->me = me;
+ result->me = *me;
result->fd = fd;
result->in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &result->in.allocatedSize);
+ result->flags.readMoreRequests = 1;
return result;
}
/* Handle a new connection on HTTP socket. */
void
-httpAccept(int sock, void *data)
+httpAccept(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *peer,
+ comm_err_t flag, int xerrno, void *data)
{
int *N = &incoming_sockets_accepted;
- int fd = -1;
ConnStateData *connState = NULL;
- struct sockaddr_in peer;
- struct sockaddr_in me;
- int max = INCOMING_HTTP_MAX;
#if USE_IDENT
static aclCheck_t identChecklist;
#endif
- commSetSelect(sock, COMM_SELECT_READ, httpAccept, NULL, 0);
- while (max-- && !httpAcceptDefer(sock, NULL)) {
- memset(&peer, '\0', sizeof(struct sockaddr_in));
- memset(&me, '\0', sizeof(struct sockaddr_in));
- if ((fd = comm_accept(sock, &peer, &me)) < 0) {
- if (!ignoreErrno(errno))
- debug(50, 1) ("httpAccept: FD %d: accept failure: %s\n",
+ /* kick off another one for later */
+ comm_accept(sock, httpAccept, NULL);
+
+ /* XXX we're not considering httpAcceptDefer yet! */
+
+ if (flag != COMM_OK) {
+ errno = xerrno;
+ debug(50, 1) ("httpAccept: FD %d: accept failure: %s\n",
sock, xstrerror());
- break;
+ return;
}
- debug(33, 4) ("httpAccept: FD %d: accepted\n", fd);
- connState = connStateCreate(peer, me, fd);
- comm_add_close_handler(fd, connStateFree, connState);
+
+ debug(33, 4) ("httpAccept: FD %d: accepted\n", newfd);
+ connState = connStateCreate(peer, me, newfd);
+ comm_add_close_handler(newfd, connStateFree, connState);
if (Config.onoff.log_fqdn)
- fqdncache_gethostbyaddr(peer.sin_addr, FQDN_LOOKUP_IF_MISS);
- commSetTimeout(fd, Config.Timeout.request, requestTimeout, connState);
+ fqdncache_gethostbyaddr(peer->sin_addr, FQDN_LOOKUP_IF_MISS);
+ commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState);
#if USE_IDENT
- identChecklist.src_addr = peer.sin_addr;
- identChecklist.my_addr = me.sin_addr;
- identChecklist.my_port = ntohs(me.sin_port);
+ identChecklist.src_addr = peer->sin_addr;
+ identChecklist.my_addr = me->sin_addr;
+ identChecklist.my_port = ntohs(me->sin_port);
if (aclCheckFast(Config.accessList.identLookup, &identChecklist))
- identStart(&me, &peer, clientIdentDone, connState);
+ identStart(me, peer, clientIdentDone, connState);
#endif
- commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, connState, 0);
- commSetDefer(fd, clientReadDefer, connState);
- clientdbEstablished(peer.sin_addr, 1);
+ clientReadSomeData(connState);
+ commSetDefer(newfd, clientReadDefer, connState);
+ clientdbEstablished(peer->sin_addr, 1);
assert(N);
(*N)++;
- }
}
#if USE_SSL
debug(83, 5) ("clientNegotiateSSL: FD %d has no certificate.\n", fd);
}
- commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
+ clientReadSomeData(conn);
}
struct _https_port_data {
/* handle a new HTTPS connection */
static void
-httpsAccept(int sock, void *data)
+httpsAccept(int sock, int newfd, struct sockaddr_in *me, struct sockaddr_in *peer,
+ comm_err_t flag, int xerrno, void *data)
{
int *N = &incoming_sockets_accepted;
https_port_data *https_port = (https_port_data *)data;
SSL_CTX *sslContext = https_port->sslContext;
- int fd = -1;
ConnStateData *connState = NULL;
- struct sockaddr_in peer;
- struct sockaddr_in me;
- int max = INCOMING_HTTP_MAX;
SSL *ssl;
int ssl_error;
#if USE_IDENT
static aclCheck_t identChecklist;
#endif
- commSetSelect(sock, COMM_SELECT_READ, httpsAccept, https_port, 0);
- while (max-- && !httpAcceptDefer(sock, NULL)) {
- memset(&peer, '\0', sizeof(struct sockaddr_in));
- memset(&me, '\0', sizeof(struct sockaddr_in));
- if ((fd = comm_accept(sock, &peer, &me)) < 0) {
- if (!ignoreErrno(errno))
- debug(50, 1) ("httpsAccept: FD %d: accept failure: %s\n",
- sock, xstrerror());
- break;
- }
- if ((ssl = SSL_new(sslContext)) == NULL) {
- ssl_error = ERR_get_error();
- debug(83, 1) ("httpsAccept: Error allocating handle: %s\n",
- ERR_error_string(ssl_error, NULL));
- break;
- }
- SSL_set_fd(ssl, fd);
- fd_table[fd].ssl = ssl;
- fd_table[fd].read_method = &ssl_read_method;
- fd_table[fd].write_method = &ssl_write_method;
- debug(50, 5) ("httpsAccept: FD %d accepted, starting SSL negotiation.\n", fd);
-
- connState = connStateCreate(peer, me, fd);
- /* XXX account connState->in.buf */
- comm_add_close_handler(fd, connStateFree, connState);
- if (Config.onoff.log_fqdn)
- fqdncache_gethostbyaddr(peer.sin_addr, FQDN_LOOKUP_IF_MISS);
- commSetTimeout(fd, Config.Timeout.request, requestTimeout, connState);
+
+ if (flag != COMM_OK) {
+ errno = xerrno;
+ debug(50, 1) ("httpsAccept: FD %d: accept failure: %s\n",
+ sock, xstrerror());
+ return;
+ }
+
+ if ((ssl = SSL_new(sslContext)) == NULL) {
+ ssl_error = ERR_get_error();
+ debug(83, 1) ("httpsAccept: Error allocating handle: %s\n",
+ ERR_error_string(ssl_error, NULL));
+ return;
+ }
+
+ SSL_set_fd(ssl, newfd);
+ fd_table[newfd].ssl = ssl;
+ fd_table[newfd].read_method = &ssl_read_method;
+ fd_table[newfd].write_method = &ssl_write_method;
+ debug(50, 5) ("httpsAccept: FD %d accepted, starting SSL negotiation.\n", newfd);
+
+ connState = connStateCreate(peer, me, newfd);
+ /* XXX account connState->in.buf */
+ comm_add_close_handler(newfd, connStateFree, connState);
+ if (Config.onoff.log_fqdn)
+ fqdncache_gethostbyaddr(peer->sin_addr, FQDN_LOOKUP_IF_MISS);
+ commSetTimeout(newfd, Config.Timeout.request, requestTimeout, connState);
#if USE_IDENT
- identChecklist.src_addr = peer.sin_addr;
- identChecklist.my_addr = me.sin_addr;
- identChecklist.my_port = ntohs(me.sin_port);
- if (aclCheckFast(Config.accessList.identLookup, &identChecklist))
- identStart(&me, &peer, clientIdentDone, connState);
+ identChecklist.src_addr = peer->sin_addr;
+ identChecklist.my_addr = me->sin_addr;
+ identChecklist.my_port = ntohs(me->sin_port);
+ if (aclCheckFast(Config.accessList.identLookup, &identChecklist))
+ identStart(me, peer, clientIdentDone, connState);
#endif
- commSetSelect(fd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0);
- commSetDefer(fd, clientReadDefer, connState);
- clientdbEstablished(peer.sin_addr, 1);
- (*N)++;
- }
+ commSetSelect(newfd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0);
+ commSetDefer(newfd, clientReadDefer, connState);
+ clientdbEstablished(peer->sin_addr, 1);
+ (*N)++;
}
#endif /* USE_SSL */
if (fd < 0)
continue;
comm_listen(fd);
- commSetSelect(fd, COMM_SELECT_READ, httpAccept, NULL, 0);
+ comm_accept(fd, httpAccept, NULL);
/*
* We need to set a defer handler here so that we don't
* peg the CPU with select() when we hit the FD limit.
sslCreateContext(s->cert, s->key, s->version, s->cipher,
s->options);
comm_listen(fd);
- commSetSelect(fd, COMM_SELECT_READ, httpsAccept, https_port, 0);
+ comm_accept(fd, httpsAccept, NULL);
commSetDefer(fd, httpAcceptDefer, NULL);
debug(1, 1) ("Accepting HTTPS connections at %s, port %d, FD %d.\n",
inet_ntoa(s->s.sin_addr), (int) ntohs(s->s.sin_port), fd);
/*
- * $Id: client_side_reply.cc,v 1.16 2002/10/13 20:35:00 robertc Exp $
+ * $Id: client_side_reply.cc,v 1.17 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 88 Client-side Reply Routines
* AUTHOR: Robert Collins (Originally Duane Wessels in client_side.c)
StoreEntry *e = http->entry;
MemObject *mem;
request_t *r = http->request;
- debug(88, 3) ("clientCacheHit: %s, %ud bytes\n", http->uri, result.length);
+ debug(88, 3) ("clientCacheHit: %s, %ud bytes\n", http->uri, (unsigned int)result.length);
if (http->entry == NULL) {
debug(88, 3) ("clientCacheHit: request aborted\n");
return;
context->flags.storelogiccomplete = 1;
debug(88, 5) ("clientSendMoreData: %s, %d bytes (%u new bytes)\n",
- http->uri, (int) size, result.length);
+ http->uri, (int) size, (unsigned int)result.length);
assert(size <= HTTP_REQBUF_SZ || context->flags.headersSent);
assert(http->request != NULL);
/* ESI TODO: remove this assert once everything is stable */
/*
- * $Id: comm.cc,v 1.336 2002/10/13 20:35:00 robertc Exp $
+ * $Id: comm.cc,v 1.337 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 5 Socket Functions
* AUTHOR: Harvest Derived
*/
#include "squid.h"
+#include "StoreIOBuffer.h"
+#include "comm.h"
#if defined(_SQUID_CYGWIN_)
#include <sys/ioctl.h>
static int commRetryConnect(ConnectStateData * cs);
CBDATA_TYPE(ConnectStateData);
+
+struct _fdc_t {
+ int active;
+ dlink_list CommCallbackList;
+ struct {
+ char *buf;
+ int size;
+ IOCB *handler;
+ void *handler_data;
+ } read;
+ struct {
+ struct sockaddr_in me;
+ struct sockaddr_in pn;
+ IOACB *handler;
+ void *handler_data;
+ } accept;
+ struct CommFiller {
+ StoreIOBuffer requestedData;
+ size_t amountDone;
+ IOFCB *handler;
+ void *handler_data;
+ } fill;
+
+};
+typedef struct _fdc_t fdc_t;
+
+typedef enum {
+ COMM_CB_READ = 1,
+ COMM_CB_WRITE,
+ COMM_CB_ACCEPT,
+ COMM_CB_FILL
+} comm_callback_t;
+
+struct _CommCallbackData {
+ comm_callback_t type;
+ dlink_node fd_node;
+ dlink_node h_node;
+ int fd;
+ int newfd; /* for accept() */
+ char *buf;
+ int retval;
+ union {
+ IOCB *r_callback;
+ IOACB *a_callback;
+ IOFCB *f_callback;
+ } c;
+ void *callback_data;
+ comm_err_t errcode;
+ int xerrno;
+ int seqnum;
+ struct sockaddr_in me;
+ struct sockaddr_in pn;
+ StoreIOBuffer sb;
+};
+typedef struct _CommCallbackData CommCallbackData;
+
+struct _fd_debug_t {
+ char *close_file;
+ int close_line;
+};
+typedef struct _fd_debug_t fd_debug_t;
+
static MemPool *comm_write_pool = NULL;
static MemPool *conn_close_pool = NULL;
+static MemPool *comm_callback_pool = NULL;
+fdc_t *fdc_table = NULL;
+fd_debug_t *fdd_table = NULL;
+dlink_list CommCallbackList;
+static int CommCallbackSeqnum = 1;
+
+
+/* New and improved stuff */
+
+/*
+ * return whether there are entries in the callback queue
+ */
+int
+comm_existsiocallback(void)
+{
+ return CommCallbackList.head == NULL;
+}
+
+/*
+ * add an IO callback
+ *
+ * IO callbacks are added when we want to notify someone that some IO
+ * has finished but we don't want to risk re-entering a non-reentrant
+ * code block.
+ */
+static void
+comm_addreadcallback(int fd, IOCB *callback, char *buf, size_t retval, comm_err_t errcode,
+ int xerrno, void *callback_data)
+{
+ CommCallbackData *cio;
+
+ assert(fdc_table[fd].active == 1);
+
+ /* Allocate a new struct */
+ cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
+
+ /* Throw our data into it */
+ cio->fd = fd;
+ cio->retval = retval;
+ cio->xerrno = xerrno;
+ cio->errcode = errcode;
+ cio->c.r_callback = callback;
+ cio->callback_data = callback_data;
+ cio->seqnum = CommCallbackSeqnum;
+ cio->buf = buf;
+ cio->type = COMM_CB_READ;
+
+ /* Add it to the end of the list */
+ dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
+
+ /* and add it to the end of the fd list */
+ dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
+
+}
+
+
+static void
+comm_addacceptcallback(int fd, int newfd, IOACB *callback, struct sockaddr_in *pn,
+ struct sockaddr_in *me, comm_err_t errcode, int xerrno, void *callback_data)
+{
+ CommCallbackData *cio;
+
+ assert(fdc_table[fd].active == 1);
+
+ /* Allocate a new struct */
+ cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
+
+ /* Throw our data into it */
+ cio->fd = fd;
+ cio->xerrno = xerrno;
+ cio->errcode = errcode;
+ cio->c.a_callback = callback;
+ cio->callback_data = callback_data;
+ cio->seqnum = CommCallbackSeqnum;
+ cio->type = COMM_CB_ACCEPT;
+ cio->newfd = newfd;
+ cio->pn = *pn;
+ cio->me = *me;
+
+ /* Add it to the end of the list */
+ dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
+
+ /* and add it to the end of the fd list */
+ dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
+
+}
+
+static void
+comm_add_fill_callback(int fd, size_t retval, comm_err_t errcode, int xerrno)
+{
+ CommCallbackData *cio;
+
+ assert(fdc_table[fd].active == 1);
+
+ /* Allocate a new struct */
+ cio = (CommCallbackData *)memPoolAlloc(comm_callback_pool);
+
+ /* Throw our data into it */
+ cio->fd = fd;
+ cio->xerrno = xerrno;
+ cio->errcode = errcode;
+ cio->c.f_callback = fdc_table[fd].fill.handler;
+ cio->callback_data = fdc_table[fd].fill.handler_data;
+ cio->seqnum = CommCallbackSeqnum;
+ cio->type = COMM_CB_FILL;
+ /* retval not used */
+ cio->retval = -1;
+ cio->sb = fdc_table[fd].fill.requestedData;
+ cio->sb.length = retval;
+ /* Clear out fd state */
+ fdc_table[fd].fill.handler = NULL;
+ fdc_table[fd].fill.handler_data = NULL;
+
+ /* Add it to the end of the list */
+ dlinkAddTail(cio, &(cio->h_node), &CommCallbackList);
+
+ /* and add it to the end of the fd list */
+ dlinkAddTail(cio, &(cio->fd_node), &(fdc_table[fd].CommCallbackList));
+}
+
+
+
+
+static void
+comm_call_io_callback(CommCallbackData *cio)
+{
+ switch(cio->type) {
+ case COMM_CB_READ:
+ cio->c.r_callback(cio->fd, cio->buf, cio->retval, cio->errcode, cio->xerrno,
+ cio->callback_data);
+ break;
+ case COMM_CB_WRITE:
+ fatal("write comm hasn't been implemented yet!");
+ break;
+ case COMM_CB_ACCEPT:
+ cio->c.a_callback(cio->fd, cio->newfd, &cio->me, &cio->pn, cio->errcode,
+ cio->xerrno, cio->callback_data);
+ break;
+ case COMM_CB_FILL:
+ cio->c.f_callback(cio->fd, cio->sb, cio->errcode,
+ cio->xerrno, cio->callback_data);
+ break;
+ default:
+ fatal("unknown comm io callback type!");
+ break;
+ };
+}
+
+
+/*
+ * call the IO callbacks
+ *
+ * This should be called before comm_select() so code can attempt to
+ * initiate some IO.
+ *
+ * When io callbacks are added, they are added with the current
+ * sequence number. The sequence number is incremented in this routine -
+ * since callbacks are added to the _tail_ of the list, when we hit a
+ * callback with a seqnum _not_ what it was when we entered this routine,
+ * we can stop.
+ */
+void
+comm_calliocallback(void)
+{
+ CommCallbackData *cio;
+ dlink_node *node;
+ int oldseqnum = CommCallbackSeqnum;
+
+ /* Call our callbacks until we hit NULL or the seqnum changes */
+ while (CommCallbackList.head != NULL) {
+ node = (dlink_node *)CommCallbackList.head;
+ cio = (CommCallbackData *)node->data;
+
+ /* If seqnum isn't the same, its time to die */
+ if (cio->seqnum != oldseqnum)
+ break; /* we've hit newly-added events */
+
+ assert(fdc_table[cio->fd].active == 1);
+
+ dlinkDelete(&cio->h_node, &CommCallbackList);
+ dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList));
+ comm_call_io_callback(cio);
+ memPoolFree(comm_callback_pool, cio);
+ }
+}
+
+
+/*
+ * Queue a callback
+ */
+static void
+comm_read_callback(int fd, int retval, comm_err_t errcode, int xerrno)
+{
+ fdc_t *Fc = &fdc_table[fd];
+
+ assert(Fc->read.handler != NULL);
+
+ comm_addreadcallback(fd, Fc->read.handler, Fc->read.buf, retval, errcode, xerrno,
+ Fc->read.handler_data);
+ Fc->read.handler = NULL;
+ Fc->read.handler_data = NULL;
+}
+
+/*
+ * Attempt a read
+ *
+ * If the read attempt succeeds or fails, call the callback.
+ * Else, wait for another IO notification.
+ */
+static void
+comm_read_try(int fd, void *data)
+{
+ fdc_t *Fc = &fdc_table[fd];
+ int retval;
+
+ /* make sure we actually have a callback */
+ assert(Fc->read.handler != NULL);
+
+ /* Attempt a read */
+ statCounter.syscalls.sock.reads++;
+ retval = FD_READ_METHOD(fd, Fc->read.buf, Fc->read.size);
+ if (retval < 0 && !ignoreErrno(errno)) {
+ comm_read_callback(fd, -1, COMM_ERROR, errno);
+ return;
+ };
+
+ /* See if we read anything */
+ /* Note - read 0 == socket EOF, which is a valid read */
+ if (retval >= 0) {
+ fd_bytes(fd, retval, FD_READ);
+ comm_read_callback(fd, retval, COMM_OK, 0);
+ return;
+ }
+
+ /* Nope, register for some more IO */
+ commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0);
+}
+
+/*
+ * Queue a read. handler/handler_data are called when the read
+ * completes, on error, or on file descriptor close.
+ */
+void
+comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
+{
+ /* Make sure we're not reading anything and we're not closing */
+ assert(fdc_table[fd].active == 1);
+ assert(fdc_table[fd].read.handler == NULL);
+ assert(!fd_table[fd].flags.closing);
+
+ /* Queue a read */
+ fdc_table[fd].read.buf = buf;
+ fdc_table[fd].read.size = size;
+ fdc_table[fd].read.handler = handler;
+ fdc_table[fd].read.handler_data = handler_data;
+
+#if OPTIMISTIC_IO
+ comm_read_try(fd, NULL);
+#else
+ /* Register intrest in a FD read */
+ commSetSelect(fd, COMM_SELECT_READ, comm_read_try, NULL, 0);
+#endif
+}
+
+static void
+comm_fill_read(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+{
+ /* TODO use a reference to the table entry, or use C++ :] */
+ StoreIOBuffer *sb;
+ _fdc_t::CommFiller *fill;
+ assert(fdc_table[fd].active == 1);
+
+ if (flag != COMM_OK) {
+ /* Error! */
+ /* XXX This was -1 below, but -1 can't be used for size_t parameters.
+ * The callback should set -1 to the client if needed based on the flags
+ */
+ comm_add_fill_callback(fd, 0, flag, xerrno);
+ return;
+ }
+ /* flag is COMM_OK */
+ /* We handle EOFs as read lengths of 0! Its eww, but its consistent */
+ fill = &fdc_table[fd].fill;
+ fill->amountDone += len;
+ sb = &fdc_table[fd].fill.requestedData;
+ assert(fill->amountDone <= sb->length);
+ comm_add_fill_callback(fd, fill->amountDone, COMM_OK, 0);
+}
+
+/*
+ * Try filling a StoreIOBuffer with some data, and call a callback when successful
+ */
+void
+comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data)
+{
+ assert(fdc_table[fd].fill.handler == NULL);
+ /* prevent confusion */
+ assert (sb.offset == 0);
+
+ /* If we don't have any data, record details and schedule a read */
+ fdc_table[fd].fill.handler = callback;
+ fdc_table[fd].fill.handler_data = data;
+ fdc_table[fd].fill.requestedData = sb;
+ fdc_table[fd].fill.amountDone = 0;
+
+ comm_read(fd, sb.data, sb.length, comm_fill_read, NULL);
+}
+
+
+/*
+ * Empty the read buffers
+ *
+ * This is a magical routine that empties the read buffers.
+ * Under some platforms (Linux) if a buffer has data in it before
+ * you call close(), the socket will hang and take quite a while
+ * to timeout.
+ */
+static void
+comm_empty_os_read_buffers(int fd)
+{
+#ifdef _SQUID_LINUX_
+ /* prevent those nasty RST packets */
+ char buf[SQUID_TCP_SO_RCVBUF];
+ if (fd_table[fd].flags.nonblocking == 1)
+ while (FD_READ_METHOD(fd, buf, SQUID_TCP_SO_RCVBUF) > 0);
+#endif
+}
+
+
+/*
+ * Return whether a file descriptor has any pending read request callbacks
+ *
+ * Assumptions: the fd is open (ie, its not closing)
+ */
+int
+comm_has_pending_read_callback(int fd)
+{
+ dlink_node *node;
+ CommCallbackData *cd;
+
+ assert(fd_table[fd].flags.open == 1);
+ assert(fdc_table[fd].active == 1);
+
+ /*
+ * XXX I don't like having to walk the list!
+ * Instead, if this routine is called often enough, we should
+ * also maintain a linked list of _read_ events - we can just
+ * check if the list head a HEAD..
+ * - adrian
+ */
+ node = fdc_table[fd].CommCallbackList.head;
+ while (node != NULL) {
+ cd = (CommCallbackData *)node->data;
+ if (cd->type == COMM_CB_READ)
+ return 1;
+ node = node->next;
+ }
+
+ /* Not found */
+ return 0;
+}
+
+/*
+ * return whether a file descriptor has a read handler
+ *
+ * Assumptions: the fd is open
+ */
+int
+comm_has_pending_read(int fd)
+{
+ assert(fd_table[fd].flags.open == 1);
+ assert(fdc_table[fd].active == 1);
+
+ return (fdc_table[fd].read.handler != NULL);
+}
+
+/*
+ * Cancel a pending read. Assert that we have the right parameters,
+ * and that there are no pending read events!
+ */
+void
+comm_read_cancel(int fd, IOCB *callback, void *data)
+{
+ assert(fd_table[fd].flags.open == 1);
+ assert(fdc_table[fd].active == 1);
+
+ assert(fdc_table[fd].read.handler == callback);
+ assert(fdc_table[fd].read.handler_data == data);
+
+ assert(!comm_has_pending_read_callback(fd));
+
+ /* Ok, we can be reasonably sure we won't lose any data here! */
+
+ /* Delete the callback */
+ fdc_table[fd].read.handler = NULL;
+ fdc_table[fd].read.handler_data = NULL;
+}
+
+
+void
+fdc_open(int fd, unsigned int type, char *desc)
+{
+ assert(fdc_table[fd].active == 0);
+
+ fdc_table[fd].active = 1;
+ fd_open(fd, type, desc);
+}
+
+
+/* Older stuff */
static void
CommWriteStateCallbackAndFree(int fd, comm_err_t code)
/* update fdstat */
debug(5, 5) ("comm_open: FD %d is a new socket\n", new_socket);
fd_open(new_socket, FD_SOCKET, note);
+ fdd_table[new_socket].close_file = NULL;
+ fdd_table[new_socket].close_line = 0;
+ assert(fdc_table[new_socket].active == 0);
+ fdc_table[new_socket].active = 1;
F = &fd_table[new_socket];
F->local_addr = addr;
F->tos = tos;
return new_socket;
}
-/*
- * NOTE: set the listen queue to Squid_MaxFD/4 and rely on the kernel to
- * impose an upper limit. Solaris' listen(3n) page says it has
- * no limit on this parameter, but sys/socket.h sets SOMAXCONN
- * to 5. HP-UX currently has a limit of 20. SunOS is 5 and
- * OSF 3.0 is 8.
- */
-int
-comm_listen(int sock)
-{
- int x;
- if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
- debug(50, 0) ("comm_listen: listen(%d, %d): %s\n",
- Squid_MaxFD >> 2,
- sock, xstrerror());
- return x;
- }
- return sock;
-}
-
void
commConnectStart(int fd, const char *host, u_short port, CNCB * callback, void *data)
{
/* Wait for an incoming connection on FD. FD should be a socket returned
* from comm_listen. */
int
-comm_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me)
+comm_old_accept(int fd, struct sockaddr_in *pn, struct sockaddr_in *me)
{
int sock;
struct sockaddr_in P;
if ((sock = accept(fd, (struct sockaddr *) &P, &Slen)) < 0) {
PROF_stop(comm_accept);
if (ignoreErrno(errno)) {
- debug(50, 5) ("comm_accept: FD %d: %s\n", fd, xstrerror());
+ debug(50, 5) ("comm_old_accept: FD %d: %s\n", fd, xstrerror());
return COMM_NOMESSAGE;
} else if (ENFILE == errno || EMFILE == errno) {
- debug(50, 3) ("comm_accept: FD %d: %s\n", fd, xstrerror());
+ debug(50, 3) ("comm_old_accept: FD %d: %s\n", fd, xstrerror());
return COMM_ERROR;
} else {
- debug(50, 1) ("comm_accept: FD %d: %s\n", fd, xstrerror());
+ debug(50, 1) ("comm_old_accept: FD %d: %s\n", fd, xstrerror());
return COMM_ERROR;
}
}
commSetCloseOnExec(sock);
/* fdstat update */
fd_open(sock, FD_SOCKET, "HTTP Request");
+ fdd_table[sock].close_file = NULL;
+ fdd_table[sock].close_line = 0;
+ fdc_table[sock].active = 1;
F = &fd_table[sock];
xstrncpy(F->ipaddr, inet_ntoa(P.sin_addr), 16);
F->remote_port = htons(P.sin_port);
comm_close(fd);
}
+
+/*
+ * Close the socket fd.
+ *
+ * + call write handlers with ERR_CLOSING
+ * + call read handlers with ERR_CLOSING
+ * + call closing handlers
+ */
void
-comm_close(int fd)
+_comm_close(int fd, char *file, int line)
{
fde *F = NULL;
+ dlink_node *node;
+ CommCallbackData *cio;
debug(5, 5) ("comm_close: FD %d\n", fd);
assert(fd >= 0);
assert(fd < Squid_MaxFD);
F = &fd_table[fd];
+ fdd_table[fd].close_file = file;
+ fdd_table[fd].close_line = line;
if (F->flags.closing)
return;
if (shutting_down && (!F->flags.open || F->type == FD_FILE))
return;
assert(F->flags.open);
+ /* The following fails because ipc.c is doing calls to pipe() to create sockets! */
+ /* assert(fdc_table[fd].active == 1); */
assert(F->type != FD_FILE);
PROF_start(comm_close);
F->flags.closing = 1;
#endif
commSetTimeout(fd, -1, NULL, NULL);
CommWriteStateCallbackAndFree(fd, COMM_ERR_CLOSING);
+
+ /* Delete any pending io callbacks */
+ while (fdc_table[fd].CommCallbackList.head != NULL) {
+ node = fdc_table[fd].CommCallbackList.head;
+ cio = (CommCallbackData *)node->data;
+ assert(fd == cio->fd); /* just paranoid */
+ dlinkDelete(&cio->h_node, &CommCallbackList);
+ dlinkDelete(&cio->fd_node, &(fdc_table[cio->fd].CommCallbackList));
+
+ comm_call_io_callback(cio);
+ memPoolFree(comm_callback_pool, cio);
+ }
+
commCallCloseHandlers(fd);
if (F->uses) /* assume persistent connect count */
pconnHistCount(1, F->uses);
F->ssl = NULL;
}
#endif
+ comm_empty_os_read_buffers(fd);
fd_close(fd); /* update fdstat */
close(fd);
+ fdc_table[fd].active = 0;
+ bzero(&fdc_table[fd], sizeof(fdc_t));
statCounter.syscalls.sock.closes++;
PROF_stop(comm_close);
}
int nonblocking = TRUE;
if (fd_table[fd].type != FD_PIPE) {
if (ioctl(fd, FIONBIO, &nonblocking) < 0) {
- debug(50, 0) ("commSetNonBlocking: FD %d: %s %u\n", fd, xstrerror(), fd_table[fd].type);
+ debug(50, 0) ("commSetNonBlocking: FD %d: %s %D\n", fd, xstrerror(), fd_table[fd].type);
return COMM_ERROR;
}
} else {
void
comm_init(void)
{
- fd_table = (fde *)xcalloc(Squid_MaxFD, sizeof(fde));
+ fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
+ fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
+ fdc_table = (fdc_t *)xcalloc(Squid_MaxFD, sizeof(fdc_t));
/* XXX account fd_table */
/* Keep a few file descriptors free so that we don't run out of FD's
* after accepting a client but before it opens a socket or a file.
* Since Squid_MaxFD can be as high as several thousand, don't waste them */
RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
CBDATA_INIT_TYPE(ConnectStateData);
+
+ comm_callback_pool = memPoolCreate("comm callbacks", sizeof(CommCallbackData));
comm_write_pool = memPoolCreate("CommWriteStateData", sizeof(CommWriteStateData));
conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
}
comm_write(int fd, const char *buf, int size, CWCB * handler, void *handler_data, FREE * free_func)
{
CommWriteStateData *state = fd_table[fd].rwstate;
+
+ assert(!fd_table[fd].flags.closing);
+
debug(5, 5) ("comm_write: FD %d: sz %d: hndl %p: data %p.\n",
fd, size, handler, handler_data);
if (NULL != state) {
comm_write(fd, mb.buf, mb.size, handler, handler_data, memBufFreeFunc(&mb));
}
+
/*
* hm, this might be too general-purpose for all the places we'd
* like to use it.
return 0;
return F->defer_check(fd, F->defer_data);
}
+
+
+/*
+ * New-style listen and accept routines
+ *
+ * Listen simply registers our interest in an FD for listening,
+ * and accept takes a callback to call when an FD has been
+ * accept()ed.
+ */
+int
+comm_listen(int sock)
+{
+ int x;
+ if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
+ debug(50, 0) ("comm_listen: listen(%d, %d): %s\n",
+ Squid_MaxFD >> 2,
+ sock, xstrerror());
+ return x;
+ }
+ return sock;
+}
+
+
+/*
+ * This callback is called whenever a filedescriptor is ready
+ * to dupe itself and fob off an accept()ed connection
+ */
+static void
+comm_accept_try(int fd, void *data)
+{
+ int newfd;
+ fdc_t *Fc;
+
+ assert(fdc_table[fd].active == 1);
+
+ Fc = &(fdc_table[fd]);
+
+ /* Accept a new connection */
+ newfd = comm_old_accept(fd, &Fc->accept.pn, &Fc->accept.me);
+
+ if (newfd < 0) {
+ /* Issues - check them */
+ if (newfd == COMM_NOMESSAGE) {
+ /* register interest again */
+ commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
+ return;
+ }
+ /* Problem! */
+ comm_addacceptcallback(fd, -1, Fc->accept.handler, &Fc->accept.pn, &Fc->accept.me, COMM_ERROR, errno, Fc->accept.handler_data);
+ Fc->accept.handler = NULL;
+ Fc->accept.handler_data = NULL;
+ return;
+ }
+
+ /* setup our new filedescriptor in fd_table */
+ /* and set it up in fdc_table */
+
+ /* queue a completed callback with the new FD */
+ comm_addacceptcallback(fd, newfd, Fc->accept.handler, &Fc->accept.pn, &Fc->accept.me, COMM_OK, 0, Fc->accept.handler_data);
+ Fc->accept.handler = NULL;
+ Fc->accept.handler_data = NULL;
+
+}
+
+
+/*
+ * Notes:
+ * + the current interface will queue _one_ accept per io loop.
+ * this isn't very optimal and should be revisited at a later date.
+ */
+void
+comm_accept(int fd, IOACB *handler, void *handler_data)
+{
+ fdc_t *Fc;
+
+ assert(fd_table[fd].flags.open == 1);
+ assert(fdc_table[fd].active == 1);
+
+ /* make sure we're not pending! */
+ assert(fdc_table[fd].accept.handler == NULL);
+
+ /* Record our details */
+ Fc = &fdc_table[fd];
+ Fc->accept.handler = handler;
+ Fc->accept.handler_data = handler_data;
+
+ /* Kick off the accept */
+#if OPTIMISTIC_IO
+ comm_accept_try(fd, NULL);
+#else
+ commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
+#endif
+}
--- /dev/null
+#ifndef __COMM_H__
+#define __COMM_H__
+
+#include "StoreIOBuffer.h"
+
+typedef void IOFCB(int fd, StoreIOBuffer recievedData, comm_err_t flag, int xerrno, void *data);
+/* fill sb with up to length data from fd */
+extern void comm_fill_immediate(int fd, StoreIOBuffer sb, IOFCB *callback, void *data);
+
+extern int comm_has_pending_read_callback(int fd);
+extern int comm_has_pending_read(int fd);
+extern void comm_read_cancel(int fd, IOCB *callback, void *data);
+extern void fdc_open(int fd, unsigned int type, char *desc);
+
+#endif
/*
- * $Id: ftp.cc,v 1.330 2002/10/14 07:35:45 hno Exp $
+ * $Id: ftp.cc,v 1.331 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 9 File Transfer Protocol (FTP)
* AUTHOR: Harvest Derived
/* Local functions */
static CNCB ftpPasvCallback;
-static PF ftpDataRead;
+static IOCB ftpDataRead;
static PF ftpDataWrite;
static CWCB ftpDataWriteCallback;
static PF ftpStateFree;
static PF ftpTimeout;
-static PF ftpReadControlReply;
+static IOCB ftpReadControlReply;
static CWCB ftpWriteCommandCallback;
static void ftpLoginParser(const char *, FtpStateData *, int escaped);
static wordlist *ftpParseControlReply(char *, size_t, int *, int *);
}
static void
-ftpDataRead(int fd, void *data)
+ftpDataRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
FtpStateData *ftpState = (FtpStateData *)data;
- int len;
int j;
int bin;
StoreEntry *entry = ftpState->entry;
delay_id delayId = delayMostBytesAllowed(mem);
#endif
assert(fd == ftpState->data.fd);
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us
+ */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
comm_close(ftpState->ctrl.fd);
return;
}
- errno = 0;
- read_sz = ftpState->data.size - ftpState->data.offset;
-#if DELAY_POOLS
- read_sz = delayBytesWanted(delayId, 1, read_sz);
-#endif
- memset(ftpState->data.buf + ftpState->data.offset, '\0', read_sz);
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, ftpState->data.buf + ftpState->data.offset, read_sz);
- if (len > 0) {
- fd_bytes(fd, len, FD_READ);
+
+ if (flag == COMM_OK && len > 0) {
#if DELAY_POOLS
delayBytesIn(delayId, len);
#endif
kb_incr(&statCounter.server.ftp.kbytes_in, len);
ftpState->data.offset += len;
}
- debug(9, 5) ("ftpDataRead: FD %d, Read %d bytes\n", fd, len);
- if (len > 0) {
+ debug(9, 5) ("ftpDataRead: FD %d, Read %d bytes\n", fd, (unsigned int)len);
+ if (flag == COMM_OK && len > 0) {
IOStats.Ftp.reads++;
for (j = len - 1, bin = 0; j; bin++)
j >>= 1;
if (ftpState->flags.isdir && !ftpState->flags.html_header_sent && len >= 0) {
ftpListingStart(ftpState);
}
- if (len < 0) {
+ if (flag != COMM_OK || len < 0) {
debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpDataRead: read error: %s\n", xstrerror());
if (ignoreErrno(errno)) {
- commSetSelect(fd,
- COMM_SELECT_READ,
- ftpDataRead,
- ftpState,
- Config.Timeout.read);
+ /* XXX what about Config.Timeout.read? */
+ read_sz = ftpState->data.size - ftpState->data.offset;
+#if DELAY_POOLS
+ read_sz = delayBytesWanted(delay_id, 1, read_sz);
+#endif
+ comm_read(fd, ftpState->data.buf + ftpState->data.offset, read_sz, ftpDataRead, data);
} else {
ftpFailed(ftpState, ERR_READ_ERROR);
/* ftpFailed closes ctrl.fd and frees ftpState */
storeAppend(entry, ftpState->data.buf, len);
ftpState->data.offset = 0;
}
- commSetSelect(fd,
- COMM_SELECT_READ,
- ftpDataRead,
- ftpState,
- Config.Timeout.read);
- }
+ /* XXX what about Config.Timeout.read? */
+ read_sz = ftpState->data.size - ftpState->data.offset;
+#if DELAY_POOLS
+ read_sz = delayBytesWanted(delay_id, 1, read_sz);
+#endif
+ comm_read(fd, ftpState->data.buf + ftpState->data.offset, read_sz, ftpDataRead, data);}
}
/*
/* We've already read some reply data */
ftpHandleControlReply(ftpState);
} else {
- commSetSelect(ftpState->ctrl.fd,
- COMM_SELECT_READ,
- ftpReadControlReply,
- ftpState,
- Config.Timeout.read);
+ /* XXX What about Config.Timeout.read? */
+ comm_read(ftpState->ctrl.fd, ftpState->ctrl.buf + ftpState->ctrl.offset, ftpState->ctrl.size - ftpState->ctrl.offset, ftpReadControlReply, ftpState);
/*
* Cancel the timeout on the Data socket (if any) and
* establish one on the control socket.
}
static void
-ftpReadControlReply(int fd, void *data)
+ftpReadControlReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
FtpStateData *ftpState = (FtpStateData *)data;
StoreEntry *entry = ftpState->entry;
- size_t len;
debug(9, 5) ("ftpReadControlReply\n");
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us
+*/
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
comm_close(ftpState->ctrl.fd);
return;
}
assert(ftpState->ctrl.offset < (off_t)ftpState->ctrl.size);
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd,
- ftpState->ctrl.buf + ftpState->ctrl.offset,
- ftpState->ctrl.size - ftpState->ctrl.offset);
- if (len > 0) {
+ if (flag == COMM_OK && len > 0) {
fd_bytes(fd, len, FD_READ);
kb_incr(&statCounter.server.all.kbytes_in, len);
kb_incr(&statCounter.server.ftp.kbytes_in, len);
}
- debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, len);
- if (len < 0) {
+ debug(9, 5) ("ftpReadControlReply: FD %d, Read %d bytes\n", fd, (int)len);
+ if (flag != COMM_OK || len < 0) {
debug(50, ignoreErrno(errno) ? 3 : 1) ("ftpReadControlReply: read error: %s\n", xstrerror());
if (ignoreErrno(errno)) {
ftpScheduleReadControlReply(ftpState, 0);
/* "read" handler to accept data connection */
static void
-ftpAcceptDataConnection(int fd, void *data)
+ftpAcceptDataConnection(int fd, int newfd, struct sockaddr_in *me, struct sockaddr_in *my_peer,
+ comm_err_t flag, int xerrno, void *data)
{
FtpStateData *ftpState = (FtpStateData *)data;
- struct sockaddr_in my_peer, me;
debug(9, 3) ("ftpAcceptDataConnection\n");
if (EBIT_TEST(ftpState->entry->flags, ENTRY_ABORTED)) {
comm_close(ftpState->ctrl.fd);
return;
}
- fd = comm_accept(fd, &my_peer, &me);
+
if (Config.Ftp.sanitycheck) {
- char *ipaddr = inet_ntoa(my_peer.sin_addr);
+ char *ipaddr = inet_ntoa(my_peer->sin_addr);
if (strcmp(fd_table[ftpState->ctrl.fd].ipaddr, ipaddr) != 0) {
- debug(9, 1) ("FTP data connection from unexpected server (%s:%d), expecting %s\n", ipaddr, (int) ntohs(my_peer.sin_port), fd_table[ftpState->ctrl.fd].ipaddr);
- comm_close(fd);
- commSetSelect(ftpState->data.fd,
- COMM_SELECT_READ,
- ftpAcceptDataConnection,
- ftpState,
- 0);
+ debug(9, 1) ("FTP data connection from unexpected server (%s:%d), expecting %s\n", ipaddr, (int) ntohs(my_peer->sin_port), fd_table[ftpState->ctrl.fd].ipaddr);
+ comm_close(newfd);
+ comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
return;
}
}
- if (fd < 0) {
- debug(9, 1) ("ftpHandleDataAccept: comm_accept(%d): %s", fd, xstrerror());
+ if (flag != COMM_OK) {
+ errno = xerrno;
+ debug(9, 1) ("ftpHandleDataAccept: comm_accept(%d): %s", newfd, xstrerror());
/* XXX Need to set error message */
ftpFail(ftpState);
return;
}
/* Replace the Listen socket with the accepted data socket */
comm_close(ftpState->data.fd);
- debug(9, 3) ("ftpAcceptDataConnection: Connected data socket on FD %d\n", fd);
- ftpState->data.fd = fd;
- ftpState->data.port = ntohs(my_peer.sin_port);
- ftpState->data.host = xstrdup(inet_ntoa(my_peer.sin_addr));
+ debug(9, 3) ("ftpAcceptDataConnection: Connected data socket on FD %d\n", newfd);
+ ftpState->data.fd = newfd;
+ ftpState->data.port = ntohs(my_peer->sin_port);
+ ftpState->data.host = xstrdup(inet_ntoa(my_peer->sin_addr));
commSetTimeout(ftpState->ctrl.fd, -1, NULL, NULL);
commSetTimeout(ftpState->data.fd, Config.Timeout.read, ftpTimeout,
ftpState);
} else if (code == 150) {
/* Accept data channel */
debug(9, 3) ("ftpReadStor: accepting data channel\n");
- commSetSelect(ftpState->data.fd,
- COMM_SELECT_READ,
- ftpAcceptDataConnection,
- ftpState,
- 0);
+ comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
} else {
debug(9, 3) ("ftpReadStor: Unexpected reply code %03d\n", code);
ftpFail(ftpState);
if (code == 125 || (code == 150 && ftpState->data.host)) {
/* Begin data transfer */
ftpAppendSuccessHeader(ftpState);
- commSetSelect(ftpState->data.fd,
- COMM_SELECT_READ,
- ftpDataRead,
- ftpState,
- Config.Timeout.read);
+ /* XXX what about Config.Timeout.read? */
+ assert(ftpState->data.offset == 0);
+ comm_read(ftpState->data.fd, ftpState->data.buf, ftpState->data.size, ftpDataRead, ftpState);
commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry);
ftpState->state = READING_DATA;
/*
return;
} else if (code == 150) {
/* Accept data channel */
- commSetSelect(ftpState->data.fd,
- COMM_SELECT_READ,
- ftpAcceptDataConnection,
- ftpState,
- 0);
+ comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
/*
* Cancel the timeout on the Control socket and establish one
* on the data socket
/* Begin data transfer */
debug(9, 3) ("ftpReadRetr: reading data channel\n");
ftpAppendSuccessHeader(ftpState);
- commSetSelect(ftpState->data.fd,
- COMM_SELECT_READ,
- ftpDataRead,
- ftpState,
- Config.Timeout.read);
+ /* XXX what about Config.Timeout.read? */
+ size_t read_sz = ftpState->data.size - ftpState->data.offset;
+#if DELAY_POOLS
+ read_sz = delayBytesWanted(delay_id, 1, read_sz);
+#endif
+ comm_read(ftpState->data.fd, ftpState->data.buf + ftpState->data.offset,
+ read_sz, ftpDataRead, ftpState);
commSetDefer(ftpState->data.fd, fwdCheckDeferRead, ftpState->entry);
ftpState->state = READING_DATA;
/*
ftpState);
} else if (code == 150) {
/* Accept data channel */
- commSetSelect(ftpState->data.fd,
- COMM_SELECT_READ,
- ftpAcceptDataConnection,
- ftpState,
- 0);
+ comm_accept(ftpState->data.fd, ftpAcceptDataConnection, ftpState);
/*
* Cancel the timeout on the Control socket and establish one
* on the data socket
/*
- * $Id: gopher.cc,v 1.174 2002/10/14 07:35:45 hno Exp $
+ * $Id: gopher.cc,v 1.175 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 10 Gopher
* AUTHOR: Harvest Derived
int fd;
request_t *req;
FwdState *fwdState;
+ char replybuf[BUFSIZ];
} GopherStateData;
static PF gopherStateFree;
static void gopherEndHTML(GopherStateData *);
static void gopherToHTML(GopherStateData *, char *inbuf, int len);
static PF gopherTimeout;
-static PF gopherReadReply;
+static IOCB gopherReadReply;
static CWCB gopherSendComplete;
static PF gopherSendRequest;
/* This will be called when data is ready to be read from fd. Read until
* error or connection closed. */
static void
-gopherReadReply(int fd, void *data)
+gopherReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
GopherStateData *gopherState = (GopherStateData *)data;
StoreEntry *entry = gopherState->entry;
- char *buf = NULL;
- int len;
int clen;
int bin;
- size_t read_sz;
+ size_t read_sz = BUFSIZ;
+ int do_next_read = 0;
#if DELAY_POOLS
delay_id delayId = delayMostBytesAllowed(entry->mem_obj);
#endif
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
+ assert(buf == gopherState->replybuf);
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
comm_close(fd);
return;
}
+
errno = 0;
- buf = (char *)memAllocate(MEM_4K_BUF);
- read_sz = 4096 - 1; /* leave room for termination */
#if DELAY_POOLS
read_sz = delayBytesWanted(delayId, 1, read_sz);
#endif
+
/* leave one space for \0 in gopherToHTML */
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, buf, read_sz);
- if (len > 0) {
- fd_bytes(fd, len, FD_READ);
+ if (flag == COMM_OK && len > 0) {
#if DELAY_POOLS
delayBytesIn(delayId, len);
#endif
kb_incr(&statCounter.server.all.kbytes_in, len);
kb_incr(&statCounter.server.other.kbytes_in, len);
}
- debug(10, 5) ("gopherReadReply: FD %d read len=%d\n", fd, len);
- if (len > 0) {
+ debug(10, 5) ("gopherReadReply: FD %d read len=%d\n", fd, (int)len);
+ if (flag == COMM_OK && len > 0) {
commSetTimeout(fd, Config.Timeout.read, NULL, NULL);
IOStats.Gopher.reads++;
for (clen = len - 1, bin = 0; clen; bin++)
clen >>= 1;
IOStats.Gopher.read_hist[bin]++;
}
- if (len < 0) {
+ if (flag != COMM_OK || len < 0) {
debug(50, 1) ("gopherReadReply: error reading: %s\n", xstrerror());
if (ignoreErrno(errno)) {
- commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0);
+ do_next_read = 1;
} else if (entry->mem_obj->inmem_hi == 0) {
ErrorState *err;
err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR);
err->url = xstrdup(storeUrl(entry));
errorAppendEntry(entry, err);
comm_close(fd);
+ do_next_read = 0;
} else {
comm_close(fd);
+ do_next_read = 0;
}
} else if (len == 0 && entry->mem_obj->inmem_hi == 0) {
ErrorState *err;
err->url = xstrdup(gopherState->request);
errorAppendEntry(entry, err);
comm_close(fd);
+ do_next_read = 0;
} else if (len == 0) {
/* Connection closed; retrieval done. */
/* flush the rest of data in temp buf if there is one. */
storeBufferFlush(entry);
fwdComplete(gopherState->fwdState);
comm_close(fd);
+ do_next_read = 0;
} else {
if (gopherState->conversion != gopher_ds::NORMAL) {
gopherToHTML(gopherState, buf, len);
} else {
storeAppend(entry, buf, len);
}
- commSetSelect(fd,
- COMM_SELECT_READ,
- gopherReadReply,
- gopherState, 0);
+ do_next_read = 1;
}
- memFree(buf, MEM_4K_BUF);
+ if (do_next_read)
+ comm_read(fd, buf, read_sz, gopherReadReply, gopherState);
return;
}
gopherState->conversion = gopher_ds::NORMAL;
}
/* Schedule read reply. */
- commSetSelect(fd, COMM_SELECT_READ, gopherReadReply, gopherState, 0);
+ /* XXX this read isn't being bound by delay pools! */
+ comm_read(fd, gopherState->replybuf, BUFSIZ, gopherReadReply, gopherState);
commSetDefer(fd, fwdCheckDeferRead, entry);
if (buf)
memFree(buf, MEM_4K_BUF); /* Allocated by gopherSendRequest. */
}
gopherState->fd = fd;
gopherState->fwdState = fwdState;
- commSetSelect(fd, COMM_SELECT_WRITE, gopherSendRequest, gopherState, 0);
+ gopherSendRequest(fd, gopherState);
commSetTimeout(fd, Config.Timeout.read, gopherTimeout, gopherState);
}
/*
- * $Id: helper.cc,v 1.48 2002/10/13 20:35:01 robertc Exp $
+ * $Id: helper.cc,v 1.49 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 84 Helper process maintenance
* AUTHOR: Harvest Derived?
#define HELPER_MAX_ARGS 64
-static PF helperHandleRead;
-static PF helperStatefulHandleRead;
+static IOCB helperHandleRead;
+static IOCB helperStatefulHandleRead;
static PF helperServerFree;
static PF helperStatefulServerFree;
static void Enqueue(helper * hlp, helper_request *);
static void
-helperHandleRead(int fd, void *data)
+helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
- int len;
char *t = NULL;
helper_server *srv = (helper_server *)data;
helper_request *r;
helper *hlp = srv->parent;
assert(fd == srv->rfd);
assert(cbdataReferenceValid(data));
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
- fd_bytes(fd, len, FD_READ);
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
debug(84, 5) ("helperHandleRead: %d bytes from %s #%d.\n",
- len, hlp->id_name, srv->index + 1);
- if (len <= 0) {
+ (int)len, hlp->id_name, srv->index + 1);
+ if (flag != COMM_OK || len <= 0) {
if (len < 0)
debug(50, 1) ("helperHandleRead: FD %d read: %s\n", fd, xstrerror());
comm_close(fd);
if (r == NULL) {
/* someone spoke without being spoken to */
debug(84, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytes\n",
- hlp->id_name, srv->index + 1, len);
+ hlp->id_name, srv->index + 1, (int)len);
srv->offset = 0;
} else if ((t = strchr(srv->buf, '\n'))) {
/* end of reply found */
} else
helperKickQueue(hlp);
} else {
- commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
+ comm_read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, data);
}
}
static void
-helperStatefulHandleRead(int fd, void *data)
+helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
- int len;
char *t = NULL;
helper_stateful_server *srv = (helper_stateful_server *)data;
helper_stateful_request *r;
statefulhelper *hlp = srv->parent;
assert(fd == srv->rfd);
assert(cbdataReferenceValid(data));
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
- fd_bytes(fd, len, FD_READ);
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
debug(84, 5) ("helperStatefulHandleRead: %d bytes from %s #%d.\n",
- len, hlp->id_name, srv->index + 1);
- if (len <= 0) {
+ (int)len, hlp->id_name, srv->index + 1);
+ if (flag != COMM_OK || len <= 0) {
if (len < 0)
debug(50, 1) ("helperStatefulHandleRead: FD %d read: %s\n", fd, xstrerror());
comm_close(fd);
if (r == NULL) {
/* someone spoke without being spoken to */
debug(84, 1) ("helperStatefulHandleRead: unexpected read from %s #%d, %d bytes\n",
- hlp->id_name, srv->index + 1, len);
+ hlp->id_name, srv->index + 1, (int)len);
srv->offset = 0;
} else if ((t = strchr(srv->buf, '\n'))) {
/* end of reply found */
helperStatefulKickQueue(hlp);
}
} else {
- commSetSelect(srv->rfd, COMM_SELECT_READ, helperStatefulHandleRead, srv, 0);
+ comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset,
+ helperStatefulHandleRead, srv);
}
}
NULL, /* Handler */
NULL, /* Handler-data */
NULL); /* free */
- commSetSelect(srv->rfd,
- COMM_SELECT_READ,
- helperHandleRead,
- srv, 0);
+ comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset, helperHandleRead, srv);
debug(84, 5) ("helperDispatch: Request sent to %s #%d, %d bytes\n",
hlp->id_name, srv->index + 1, (int) strlen(r->buf));
srv->stats.uses++;
NULL, /* Handler */
NULL, /* Handler-data */
NULL); /* free */
- commSetSelect(srv->rfd,
- COMM_SELECT_READ,
- helperStatefulHandleRead,
- srv, 0);
+ comm_read(srv->rfd, srv->buf + srv->offset, srv->buf_sz - srv->offset,
+ helperStatefulHandleRead, srv);
debug(84, 5) ("helperStatefulDispatch: Request sent to %s #%d, %d bytes\n",
hlp->id_name, srv->index + 1, (int) strlen(r->buf));
srv->stats.uses++;
/*
- * $Id: http.cc,v 1.396 2002/10/14 07:35:46 hno Exp $
+ * $Id: http.cc,v 1.397 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 11 Hypertext Transfer Protocol (HTTP)
* AUTHOR: Harvest Derived
static CWCB httpSendComplete;
static CWCB httpSendRequestEntity;
-static PF httpReadReply;
+static IOCB httpReadReply;
static void httpSendRequest(HttpStateData *);
static PF httpStateFree;
static PF httpTimeout;
* error or connection closed. */
/* XXX this function is too long! */
static void
-httpReadReply(int fd, void *data)
+httpReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno,void *data)
+{
+ HttpStateData *httpState = static_cast<HttpStateData *>(data);
+ httpState->readReply (fd, buf, len, flag, xerrno, data);
+}
+
+void
+HttpStateData::readReply (int fd, char *buf, size_t len, comm_err_t flag, int xerrno,void *data)
{
HttpStateData *httpState = static_cast<HttpStateData *>(data);
- LOCAL_ARRAY(char, buf, SQUID_TCP_SO_RCVBUF);
StoreEntry *entry = httpState->entry;
const request_t *request = httpState->request;
- int len;
int bin;
int clen;
- size_t read_sz;
+ read_sz = SQUID_TCP_SO_RCVBUF;
+ do_next_read = 0;
#if DELAY_POOLS
delay_id delayId;
else
delayId = delayMostBytesAllowed(entry->mem_obj);
#endif
+
+
+ assert(buf == httpState->buf);
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us
+*/
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
+
+
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
- comm_close(fd);
+ maybeReadData();
return;
}
- /* check if we want to defer reading */
+
errno = 0;
- read_sz = SQUID_TCP_SO_RCVBUF;
+ /* prepare the read size for the next read (if any) */
#if DELAY_POOLS
read_sz = delayBytesWanted(delayId, 1, read_sz);
#endif
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, buf, read_sz);
- debug(11, 5) ("httpReadReply: FD %d: len %d.\n", fd, len);
- if (len > 0) {
- fd_bytes(fd, len, FD_READ);
+ debug(11, 5) ("httpReadReply: FD %d: len %d.\n", fd, (int)len);
+ if (flag == COMM_OK && len > 0) {
#if DELAY_POOLS
delayBytesIn(delayId, len);
#endif
clen >>= 1;
IOStats.Http.read_hist[bin]++;
}
- if (!httpState->reply_hdr && len > 0) {
+ if (!httpState->reply_hdr && flag == COMM_OK && len > 0) {
/* Skip whitespace */
while (len > 0 && xisspace(*buf))
xmemmove(buf, buf + 1, len--);
if (len == 0) {
/* Continue to read... */
- commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0);
+ do_next_read = 1;
+ maybeReadData();
return;
}
}
- if (len < 0) {
+ if (flag != COMM_OK || len < 0) {
debug(50, 2) ("httpReadReply: FD %d: read failure: %s.\n",
fd, xstrerror());
if (ignoreErrno(errno)) {
- commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0);
+ do_next_read = 1;
} else if (entry->mem_obj->inmem_hi == 0) {
ErrorState *err;
err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR);
err->request = requestLink((request_t *) request);
err->xerrno = errno;
fwdFail(httpState->fwd, err);
+ do_next_read = 0;
comm_close(fd);
} else {
+ do_next_read = 0;
comm_close(fd);
}
- } else if (len == 0 && entry->mem_obj->inmem_hi == 0) {
+ } else if (flag == COMM_OK && len == 0 && entry->mem_obj->inmem_hi == 0) {
ErrorState *err;
err = errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE);
err->xerrno = errno;
err->request = requestLink((request_t *) request);
fwdFail(httpState->fwd, err);
httpState->eof = 1;
+ do_next_read = 0;
comm_close(fd);
- } else if (len == 0) {
+ } else if (flag == COMM_OK && len == 0) {
/* Connection closed; retrieval done. */
httpState->eof = 1;
if (httpState->reply_hdr_state < 2)
httpState->processReplyHeader(buf, len);
else {
fwdComplete(httpState->fwd);
+ do_next_read = 0;
comm_close(fd);
}
} else {
/* yes we have to clear all these! */
commSetDefer(fd, NULL, NULL);
commSetTimeout(fd, -1, NULL, NULL);
- commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
+ do_next_read = 0;
#if DELAY_POOLS
delayClearNoDelay(fd);
#endif
httpStateFree(fd, this);
} else {
/* Wait for EOF condition */
- commSetSelect(fd, COMM_SELECT_READ, httpReadReply, this, 0);
+ do_next_read = 1;
}
+ maybeReadData();
+}
+
+void
+HttpStateData::maybeReadData()
+{
+ if (do_next_read)
+ comm_read(fd, buf, read_sz, httpReadReply, this);
}
/* This will be called when request write is complete. Schedule read of
return;
} else {
/* Schedule read reply. */
- commSetSelect(fd, COMM_SELECT_READ, httpReadReply, httpState, 0);
+ /* XXX we're not taking into account delay pools on this read! */
+ comm_read(fd, httpState->buf, SQUID_TCP_SO_RCVBUF, httpReadReply, httpState);
/*
* Set the read timeout here because it hasn't been set yet.
* We only set the read timeout after the request has been
/*
- * $Id: http.h,v 1.1 2002/10/13 20:35:01 robertc Exp $
+ * $Id: http.h,v 1.2 2002/10/14 08:16:58 robertc Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
#define SQUID_HTTP_H
#include "StoreIOBuffer.h"
+#include "comm.h"
class HttpStateData {
public:
void processReplyHeader(const char *, int);
void processReplyData(const char *, int);
+ IOCB readReply;
+ void maybeReadData();
StoreEntry *entry;
request_t *request;
int fd;
http_state_flags flags;
FwdState *fwd;
+ int do_next_read;
+ size_t read_sz;
+ char buf[SQUID_TCP_SO_RCVBUF];
};
#endif /* SQUID_HTTP_H */
/*
- * $Id: ident.cc,v 1.60 2002/09/15 06:23:29 adrian Exp $
+ * $Id: ident.cc,v 1.61 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 30 Ident (RFC 931)
* AUTHOR: Duane Wessels
struct sockaddr_in me;
struct sockaddr_in my_peer;
IdentClient *clients;
+ char buf[4096];
} IdentStateData;
-static PF identReadReply;
+static IOCB identReadReply;
static PF identClose;
static PF identTimeout;
static CNCB identConnectDone;
ntohs(state->my_peer.sin_port),
ntohs(state->me.sin_port));
comm_write_mbuf(fd, mb, NULL, state);
- commSetSelect(fd, COMM_SELECT_READ, identReadReply, state, 0);
+ comm_read(fd, state->buf, BUFSIZ, identReadReply, state);
commSetTimeout(fd, Config.Timeout.ident, identTimeout, state);
}
static void
-identReadReply(int fd, void *data)
+identReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
IdentStateData *state = data;
- LOCAL_ARRAY(char, buf, BUFSIZ);
char *ident = NULL;
char *t = NULL;
- int len = -1;
- buf[0] = '\0';
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, buf, BUFSIZ - 1);
- fd_bytes(fd, len, FD_READ);
- if (len <= 0) {
+
+ assert (buf == state->buf);
+
+ if (if (flag != COMM_OK || len <= 0) {
comm_close(fd);
return;
}
/*
- * $Id: ipc.cc,v 1.29 2002/10/13 20:35:02 robertc Exp $
+ * $Id: ipc.cc,v 1.30 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 54 Interprocess Communication
* AUTHOR: Duane Wessels
*/
#include "squid.h"
+#include "comm.h"
static const char *hello_string = "hi there\n";
#define HELLO_BUF_SZ 32
debug(50, 0) ("ipcCreate: pipe: %s\n", xstrerror());
return -1;
}
- fd_open(prfd = p2c[0], FD_PIPE, "IPC FIFO Parent Read");
- fd_open(cwfd = p2c[1], FD_PIPE, "IPC FIFO Child Write");
- fd_open(crfd = c2p[0], FD_PIPE, "IPC FIFO Child Read");
- fd_open(pwfd = c2p[1], FD_PIPE, "IPC FIFO Parent Write");
+ fdc_open(prfd = p2c[0], FD_PIPE, "IPC FIFO Parent Read");
+ fdc_open(cwfd = p2c[1], FD_PIPE, "IPC FIFO Child Write");
+ fdc_open(crfd = c2p[0], FD_PIPE, "IPC FIFO Child Read");
+ fdc_open(pwfd = c2p[1], FD_PIPE, "IPC FIFO Parent Write");
#if HAVE_SOCKETPAIR && defined(AF_UNIX)
} else if (type == IPC_UNIX_STREAM) {
int fds[2];
setsockopt(fds[0], SOL_SOCKET, SO_RCVBUF, (void *) &buflen, sizeof(buflen));
setsockopt(fds[1], SOL_SOCKET, SO_SNDBUF, (void *) &buflen, sizeof(buflen));
setsockopt(fds[1], SOL_SOCKET, SO_RCVBUF, (void *) &buflen, sizeof(buflen));
- fd_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX STREAM Parent");
- fd_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX STREAM Parent");
+ fdc_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX STREAM Parent");
+ fdc_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX STREAM Parent");
} else if (type == IPC_UNIX_DGRAM) {
int fds[2];
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) < 0) {
debug(50, 0) ("ipcCreate: socketpair: %s\n", xstrerror());
return -1;
}
- fd_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX DGRAM Parent");
- fd_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX DGRAM Parent");
+ fdc_open(prfd = pwfd = fds[0], FD_PIPE, "IPC UNIX DGRAM Parent");
+ fdc_open(crfd = cwfd = fds[1], FD_PIPE, "IPC UNIX DGRAM Parent");
#endif
} else {
assert(IPC_NONE);
/*
- * $Id: main.cc,v 1.355 2002/10/13 20:35:02 robertc Exp $
+ * $Id: main.cc,v 1.356 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 1 Startup and Main Loop
* AUTHOR: Harvest Derived
eventRun();
if ((loop_delay = eventNextTime()) < 0)
loop_delay = 0;
+ comm_calliocallback();
switch (comm_select(loop_delay)) {
case COMM_OK:
errcount = 0; /* reset if successful */
/*
- * $Id: pconn.cc,v 1.33 2002/10/13 20:35:02 robertc Exp $
+ * $Id: pconn.cc,v 1.34 2002/10/14 08:16:58 robertc Exp $
*
* DEBUG: section 48 Persistent Connections
* AUTHOR: Duane Wessels
#include "squid.h"
#include "Store.h"
+#include "comm.h"
struct _pconn {
hash_link hash; /* must be first */
int *fds;
int nfds_alloc;
int nfds;
+ char buf[4096];
};
typedef struct _pconn pconn;
int client_pconn_hist[PCONN_HIST_SZ];
int server_pconn_hist[PCONN_HIST_SZ];
-static PF pconnRead;
+static IOCB pconnRead;
static PF pconnTimeout;
static const char *pconnKey(const char *host, u_short port);
static hash_table *table = NULL;
cbdataFree(p);
}
-static void
-pconnRemoveFD(struct _pconn *p, int fd)
+static int
+pconnFindFDIndex (struct _pconn *p, int fd)
{
- int i;
- for (i = 0; i < p->nfds; i++) {
- if (p->fds[i] == fd)
- break;
+ int result;
+ for (result = 0; result < p->nfds; ++result) {
+ if (p->fds[result] == fd)
+ return result;
}
+ return p->nfds;
+}
+
+static void
+pconnRemoveFDByIndex (struct _pconn *p, int index)
+{
+ for (; index < p->nfds - 1; index++)
+ p->fds[index] = p->fds[index + 1];
+}
+
+static void
+pconnPreventHandingOutFD(struct _pconn *p, int fd)
+{
+ int i = pconnFindFDIndex (p, fd);
assert(i < p->nfds);
debug(48, 3) ("pconnRemoveFD: found FD %d at index %d\n", fd, i);
- for (; i < p->nfds - 1; i++)
- p->fds[i] = p->fds[i + 1];
+ pconnRemoveFDByIndex(p, i);
+}
+
+static void
+pconnRemoveFD(struct _pconn *p, int fd)
+{
+ pconnPreventHandingOutFD(p, fd);
if (--p->nfds == 0)
pconnDelete(p);
}
}
static void
-pconnRead(int fd, void *data)
+pconnRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
- LOCAL_ARRAY(char, buf, 256);
struct _pconn *p = (_pconn *)data;
- int n;
assert(table != NULL);
- statCounter.syscalls.sock.reads++;
- n = FD_READ_METHOD(fd, buf, 256);
- debug(48, 3) ("pconnRead: %d bytes from FD %d, %s\n", n, fd,
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
+ debug(48, 3) ("pconnRead: %d bytes from FD %d, %s\n", (int) len, fd,
hashKeyStr(&p->hash));
pconnRemoveFD(p, fd);
comm_close(fd);
xfree(old);
}
p->fds[p->nfds++] = fd;
- commSetSelect(fd, COMM_SELECT_READ, pconnRead, p, 0);
+ comm_read(fd, p->buf, BUFSIZ, pconnRead, p);
commSetTimeout(fd, Config.Timeout.pconn, pconnTimeout, p);
snprintf(desc, FD_DESC_SZ, "%s idle connection", host);
fd_note(fd, desc);
debug(48, 3) ("pconnPush: pushed FD %d for %s\n", fd, key);
}
+/*
+ * return a pconn fd for host:port, or -1 if none are available
+ *
+ * XXX this routine isn't terribly efficient - if there's a pending
+ * read event (which signifies the fd will close in the next IO loop!)
+ * we ignore the FD and move onto the next one. This means, as an example,
+ * if we have a lot of FDs open to a very popular server and we get a bunch
+ * of requests JUST as they timeout (say, it shuts down) we'll be wasting
+ * quite a bit of CPU. Just keep it in mind.
+ */
int
pconnPop(const char *host, u_short port)
{
if (hptr != NULL) {
p = (struct _pconn *) hptr;
assert(p->nfds > 0);
- fd = p->fds[0];
- pconnRemoveFD(p, fd);
- commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
- commSetTimeout(fd, -1, NULL, NULL);
+ for (int i = 0; i < p->nfds; i++) {
+ fd = p->fds[0];
+ /* If there are pending read callbacks - we're about to close it, so don't issue it! */
+ if (!comm_has_pending_read_callback(fd)) {
+ pconnRemoveFD(p, fd);
+ comm_read_cancel(fd, pconnRead, p);
+ commSetTimeout(fd, -1, NULL, NULL);
+ return fd;
+ }
+ }
}
+ /* Nothing (valid!) found */
return fd;
}
/*
- * $Id: protos.h,v 1.453 2002/10/13 23:48:24 hno Exp $
+ * $Id: protos.h,v 1.454 2002/10/14 08:16:58 robertc Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
SQUIDCEXTERN int clientHttpRequestStatus(int fd, clientHttpRequest const *http);
SQUIDCEXTERN void clientSetReplyToError(void *, err_type, http_status, method_t, char const *, struct in_addr *, request_t *, char *, auth_user_request_t * auth_user_request);
+/* comm.c */
+extern int comm_existsiocallback(void);
+extern void comm_calliocallback(void);
+extern void comm_read(int fd, char *buf, int len, IOCB *handler, void *data);
+
+extern void comm_accept(int fd, IOACB *handler, void *handler_data);
+extern int comm_listen(int fd);
SQUIDCEXTERN int commSetNonBlocking(int fd);
SQUIDCEXTERN int commUnsetNonBlocking(int fd);
SQUIDCEXTERN void commSetCloseOnExec(int fd);
-SQUIDCEXTERN int comm_accept(int fd, struct sockaddr_in *, struct sockaddr_in *);
-SQUIDCEXTERN void comm_close(int fd);
+extern int comm_old_accept(int fd, struct sockaddr_in *, struct sockaddr_in *);
+extern void _comm_close(int fd, char *file, int line);
+#define comm_close(fd) (_comm_close((fd), __FILE__, __LINE__))
SQUIDCEXTERN void comm_reset_close(int fd);
#if LINGERING_CLOSE
SQUIDCEXTERN void comm_lingering_close(int fd);
SQUIDCEXTERN void commConnectStart(int fd, const char *, u_short, CNCB *, void *);
SQUIDCEXTERN int comm_connect_addr(int sock, const struct sockaddr_in *);
SQUIDCEXTERN void comm_init(void);
-SQUIDCEXTERN int comm_listen(int sock);
SQUIDCEXTERN int comm_open(int, int, struct in_addr, u_short port, int, const char *note);
SQUIDCEXTERN int comm_openex(int, int, struct in_addr, u_short, int, unsigned char TOS, const char *);
SQUIDCEXTERN u_short comm_local_port(int fd);
SQUIDCEXTERN void icmpSourcePing(struct in_addr to, const icp_common_t *, const char *url);
SQUIDCEXTERN void icmpDomainPing(struct in_addr to, const char *domain);
-SQUIDCEXTERN PF httpAccept;
-
#ifdef SQUID_SNMP
SQUIDCEXTERN PF snmpHandleUdp;
SQUIDCEXTERN void snmpInit(void);
/*
- * $Id: ssl.cc,v 1.125 2002/10/14 07:35:46 hno Exp $
+ * $Id: ssl.cc,v 1.126 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 26 Secure Sockets Layer Proxy
* AUTHOR: Duane Wessels
static ERCB sslErrorComplete;
static PF sslServerClosed;
static PF sslClientClosed;
-static PF sslReadClient;
-static PF sslReadServer;
+static IOCB sslReadClient;
+static IOCB sslReadServer;
static PF sslTimeout;
-static PF sslWriteClient;
-static PF sslWriteServer;
+static CWCB sslWriteClientDone;
+static CWCB sslWriteServerDone;
static PSC sslPeerSelectComplete;
static void sslStateFree(SslStateData * sslState);
static void sslConnected(int fd, void *);
static void sslProxyConnected(int fd, void *);
-static void sslSetSelect(SslStateData * sslState);
#if DELAY_POOLS
static DEFER sslDeferServerRead;
#endif
}
#endif
-static void
-sslSetSelect(SslStateData * sslState)
-{
- size_t read_sz = SQUID_TCP_SO_RCVBUF;
- assert(sslState->server.fd > -1 || sslState->client.fd > -1);
- if (sslState->client.fd > -1) {
- if (sslState->server.len > 0) {
- commSetSelect(sslState->client.fd,
- COMM_SELECT_WRITE,
- sslWriteClient,
- sslState,
- 0);
- }
- if ((size_t)sslState->client.len < read_sz) {
- commSetSelect(sslState->client.fd,
- COMM_SELECT_READ,
- sslReadClient,
- sslState,
- Config.Timeout.read);
- }
- } else if (sslState->client.len == 0) {
- comm_close(sslState->server.fd);
- }
- if (sslState->server.fd > -1) {
- if (sslState->client.len > 0) {
- commSetSelect(sslState->server.fd,
- COMM_SELECT_WRITE,
- sslWriteServer,
- sslState,
- 0);
- }
-#if DELAY_POOLS
- /*
- * If this was allowed to return 0, there would be a possibility
- * of the socket becoming "hung" with data accumulating but no
- * write handler (server.len==0) and no read handler (!(0<0)) and
- * no data flowing in the other direction. Hence the argument of
- * 1 as min.
- */
- read_sz = delayBytesWanted(sslState->delayId, 1, read_sz);
-#endif
- if ((size_t)sslState->server.len < read_sz) {
- /* Have room to read more */
- commSetSelect(sslState->server.fd,
- COMM_SELECT_READ,
- sslReadServer,
- sslState,
- Config.Timeout.read);
- }
- } else if (sslState->client.fd == -1) {
- /* client already closed, nothing more to do */
- } else if (sslState->server.len == 0) {
- comm_close(sslState->client.fd);
- }
-}
/* Read from server side and queue it for writing to the client */
static void
-sslReadServer(int fd, void *data)
+sslReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
- size_t read_sz = SQUID_TCP_SO_RCVBUF - sslState->server.len;
+
assert(fd == sslState->server.fd);
- debug(26, 3) ("sslReadServer: FD %d, reading %d bytes at offset %d\n",
- fd, (int) read_sz, sslState->server.len);
errno = 0;
#if DELAY_POOLS
read_sz = delayBytesWanted(sslState->delayId, 1, read_sz);
#endif
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, sslState->server.buf + sslState->server.len, read_sz);
- debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, len);
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (errcode == COMM_ERR_CLOSING) {
+ return;
+ }
+ debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, (int)len);
if (len > 0) {
- fd_bytes(fd, len, FD_READ);
#if DELAY_POOLS
delayBytesIn(sslState->delayId, len);
#endif
comm_close(fd);
} else if (len == 0) {
comm_close(sslState->server.fd);
- }
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ /* Only close the remote end if we've finished queueing data to it */
+ if (sslState->server.len == 0 && sslState->client.fd != -1) {
+ comm_close(sslState->client.fd);
+ }
+ } else if (cbdataReferenceValid(sslState))
+ comm_write(sslState->client.fd, sslState->server.buf, len, sslWriteClientDone, sslState, NULL);
cbdataInternalUnlock(sslState); /* ??? */
}
/* Read from client side and queue it for writing to the server */
static void
-sslReadClient(int fd, void *data)
+sslReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
assert(fd == sslState->client.fd);
- debug(26, 3) ("sslReadClient: FD %d, reading %d bytes at offset %d\n",
- fd, SQUID_TCP_SO_RCVBUF - sslState->client.len,
- sslState->client.len);
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd,
- sslState->client.buf + sslState->client.len,
- SQUID_TCP_SO_RCVBUF - sslState->client.len);
- debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, len);
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (errcode == COMM_ERR_CLOSING) {
+ return;
+ }
+
+ debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, (int) len);
if (len > 0) {
- fd_bytes(fd, len, FD_READ);
kb_incr(&statCounter.client_http.kbytes_in, len);
sslState->client.len += len;
}
if (len < 0) {
int level = 1;
#ifdef ECONNRESET
- if (errno == ECONNRESET)
+ if (xerrno == ECONNRESET)
level = 2;
#endif
- if (ignoreErrno(errno))
+ if (ignoreErrno(xerrno))
level = 3;
+ /* XXX xstrerror() should be changed to take errno as an arg! */
+ errno = xerrno;
debug(50, level) ("sslReadClient: FD %d: read failure: %s\n",
fd, xstrerror());
- if (!ignoreErrno(errno))
+ if (!ignoreErrno(xerrno))
comm_close(fd);
} else if (len == 0) {
- comm_close(fd);
- }
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ comm_close(sslState->client.fd);
+ /* Only close the remote end if we've finished queueing data to it */
+ if (sslState->client.len == 0 && sslState->server.fd != -1) {
+ comm_close(sslState->server.fd);
+ }
+ } else if (cbdataReferenceValid(sslState))
+ comm_write(sslState->server.fd, sslState->client.buf, len, sslWriteServerDone, sslState, NULL);
cbdataInternalUnlock(sslState); /* ??? */
}
/* Writes data from the client buffer to the server side */
static void
-sslWriteServer(int fd, void *data)
+sslWriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
assert(fd == sslState->server.fd);
- debug(26, 3) ("sslWriteServer: FD %d, %d bytes to write\n",
- fd, sslState->client.len);
- statCounter.syscalls.sock.writes++;
- len = FD_WRITE_METHOD(fd,
- sslState->client.buf,
- sslState->client.len);
- debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, len);
+ debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, (int)len);
+ /* Valid data */
if (len > 0) {
- fd_bytes(fd, len, FD_WRITE);
kb_incr(&statCounter.server.all.kbytes_out, len);
kb_incr(&statCounter.server.other.kbytes_out, len);
- assert(len <= sslState->client.len);
- sslState->client.len -= len;
- if (sslState->client.len > 0) {
- /* we didn't write the whole thing */
- xmemmove(sslState->client.buf,
- sslState->client.buf + len,
- sslState->client.len);
- }
+ assert(len == (size_t)sslState->client.len);
+ sslState->client.len = 0;
+ }
+ /* EOF */
+ if (len == 0) {
+ comm_close(sslState->server.fd);
+ return;
+ }
+ /* If the other end has closed, so should we */
+ if (sslState->client.fd == -1) {
+ comm_close(sslState->server.fd);
+ return;
}
cbdataInternalLock(sslState); /* ??? should be locked by the caller... */
+ /* Error? */
if (len < 0) {
debug(50, ignoreErrno(errno) ? 3 : 1)
("sslWriteServer: FD %d: write failure: %s.\n", fd, xstrerror());
if (!ignoreErrno(errno))
comm_close(fd);
}
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->client.len == 0);
+ comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadClient, sslState);
+ }
cbdataInternalUnlock(sslState); /* ??? */
}
/* Writes data from the server buffer to the client side */
static void
-sslWriteClient(int fd, void *data)
+sslWriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
assert(fd == sslState->client.fd);
- debug(26, 3) ("sslWriteClient: FD %d, %d bytes to write\n",
- fd, sslState->server.len);
- statCounter.syscalls.sock.writes++;
- len = FD_WRITE_METHOD(fd,
- sslState->server.buf,
- sslState->server.len);
- debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, len);
+ debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, (int)len);
if (len > 0) {
- fd_bytes(fd, len, FD_WRITE);
kb_incr(&statCounter.client_http.kbytes_out, len);
- assert(len <= sslState->server.len);
- sslState->server.len -= len;
+ assert(len == (size_t)sslState->server.len);
+ sslState->server.len =0;
/* increment total object size */
if (sslState->size_ptr)
*sslState->size_ptr += len;
- if (sslState->server.len > 0) {
- /* we didn't write the whole thing */
- xmemmove(sslState->server.buf,
- sslState->server.buf + len,
- sslState->server.len);
- }
+ }
+ /* EOF */
+ if (len == 0) {
+ comm_close(sslState->client.fd);
+ return;
+ }
+ /* If the other end has closed, so should we */
+ if (sslState->server.fd == -1) {
+ comm_close(sslState->client.fd);
+ return;
}
cbdataInternalLock(sslState); /* ??? should be locked by the caller... */
+ /* Error? */
if (len < 0) {
debug(50, ignoreErrno(errno) ? 3 : 1)
("sslWriteClient: FD %d: write failure: %s.\n", fd, xstrerror());
if (!ignoreErrno(errno))
comm_close(fd);
}
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->server.len == 0);
+ comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadServer, sslState);
+ }
cbdataInternalUnlock(sslState); /* ??? */
}
comm_close(sslState->server.fd);
}
+static void
+sslConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data)
+{
+ SslStateData *sslState = (SslStateData *)data;
+ if (flag != COMM_OK) {
+ sslErrorComplete(fd, data, 0);
+ return;
+ }
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->server.len == 0);
+ comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadServer, sslState);
+ comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadClient, sslState);
+ }
+}
+
+
+/*
+ * handle the write completion from a proxy request to an upstream proxy
+ */
+static void
+sslProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data)
+{
+ SslStateData *sslState = (SslStateData *)data;
+ if (flag != COMM_OK) {
+ sslErrorComplete(fd, data, 0);
+ return;
+ }
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->server.len == 0);
+ comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadServer, sslState);
+ comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadClient, sslState);
+ }
+}
+
static void
sslConnected(int fd, void *data)
{
SslStateData *sslState = (SslStateData *)data;
debug(26, 3) ("sslConnected: FD %d sslState=%p\n", fd, sslState);
*sslState->status_ptr = HTTP_OK;
- xstrncpy(sslState->server.buf, conn_established, SQUID_TCP_SO_RCVBUF);
- sslState->server.len = strlen(conn_established);
- sslSetSelect(sslState);
+ comm_write(sslState->client.fd, conn_established, strlen(conn_established),
+ sslConnectedWriteDone, sslState, NULL);
}
static void
} else {
if (sslState->servers->_peer)
sslProxyConnected(sslState->server.fd, sslState);
- else
+ else {
sslConnected(sslState->server.fd, sslState);
+ }
commSetTimeout(sslState->server.fd,
Config.Timeout.read,
sslTimeout,
httpHeaderClean(&hdr_out);
packerClean(&p);
memBufAppend(&mb, "\r\n", 2);
- xstrncpy(sslState->client.buf, mb.buf, SQUID_TCP_SO_RCVBUF);
- debug(26, 3) ("sslProxyConnected: Sending {%s}\n", sslState->client.buf);
- sslState->client.len = mb.size;
- memBufClean(&mb);
+
+ comm_write_mbuf(sslState->server.fd, mb, sslProxyConnectedWriteDone, sslState);
+
commSetTimeout(sslState->server.fd,
Config.Timeout.read,
sslTimeout,
sslState);
- sslSetSelect(sslState);
}
static void
/*
- * $Id: stmem.cc,v 1.72 2002/10/13 20:35:03 robertc Exp $
+ * $Id: stmem.cc,v 1.73 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 19 Store Memory Primitives
* AUTHOR: Harvest Derived
mem_node *p = mem->head;
off_t t_off = mem->origin_offset;
size_t bytes_to_go = size;
- debug(19, 6) ("memCopy: offset %ld: size %u\n", (long int) offset, size);
+ debug(19, 6) ("memCopy: offset %ld: size %u\n", (long int) offset, (int)size);
if (p == NULL)
return 0;
/* RC: the next assert is useless */
/*
- * $Id: store_client.cc,v 1.116 2002/10/14 07:35:46 hno Exp $
+ * $Id: store_client.cc,v 1.117 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 20 Storage Manager Client-Side Interface
* AUTHOR: Duane Wessels
debug(20, 3) ("storeClientCopy: %s, from %lu, for length %d, cb %p, cbdata %p\n",
storeKeyText((const cache_key *)e->hash.key),
(unsigned long) copyInto.offset,
- copyInto.length,
+ (int) copyInto.length,
callback,
data);
assert(sc != NULL);
/*
- * $Id: structs.h,v 1.434 2002/10/13 23:48:24 hno Exp $
+ * $Id: structs.h,v 1.435 2002/10/14 08:16:59 robertc Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
int n;
time_t until;
} defer;
+ struct {
+ int readMoreRequests:1;
+ } flags;
};
struct _ipcache_addrs {
/*
- * $Id: tunnel.cc,v 1.125 2002/10/14 07:35:46 hno Exp $
+ * $Id: tunnel.cc,v 1.126 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 26 Secure Sockets Layer Proxy
* AUTHOR: Duane Wessels
static ERCB sslErrorComplete;
static PF sslServerClosed;
static PF sslClientClosed;
-static PF sslReadClient;
-static PF sslReadServer;
+static IOCB sslReadClient;
+static IOCB sslReadServer;
static PF sslTimeout;
-static PF sslWriteClient;
-static PF sslWriteServer;
+static CWCB sslWriteClientDone;
+static CWCB sslWriteServerDone;
static PSC sslPeerSelectComplete;
static void sslStateFree(SslStateData * sslState);
static void sslConnected(int fd, void *);
static void sslProxyConnected(int fd, void *);
-static void sslSetSelect(SslStateData * sslState);
#if DELAY_POOLS
static DEFER sslDeferServerRead;
#endif
}
#endif
-static void
-sslSetSelect(SslStateData * sslState)
-{
- size_t read_sz = SQUID_TCP_SO_RCVBUF;
- assert(sslState->server.fd > -1 || sslState->client.fd > -1);
- if (sslState->client.fd > -1) {
- if (sslState->server.len > 0) {
- commSetSelect(sslState->client.fd,
- COMM_SELECT_WRITE,
- sslWriteClient,
- sslState,
- 0);
- }
- if ((size_t)sslState->client.len < read_sz) {
- commSetSelect(sslState->client.fd,
- COMM_SELECT_READ,
- sslReadClient,
- sslState,
- Config.Timeout.read);
- }
- } else if (sslState->client.len == 0) {
- comm_close(sslState->server.fd);
- }
- if (sslState->server.fd > -1) {
- if (sslState->client.len > 0) {
- commSetSelect(sslState->server.fd,
- COMM_SELECT_WRITE,
- sslWriteServer,
- sslState,
- 0);
- }
-#if DELAY_POOLS
- /*
- * If this was allowed to return 0, there would be a possibility
- * of the socket becoming "hung" with data accumulating but no
- * write handler (server.len==0) and no read handler (!(0<0)) and
- * no data flowing in the other direction. Hence the argument of
- * 1 as min.
- */
- read_sz = delayBytesWanted(sslState->delayId, 1, read_sz);
-#endif
- if ((size_t)sslState->server.len < read_sz) {
- /* Have room to read more */
- commSetSelect(sslState->server.fd,
- COMM_SELECT_READ,
- sslReadServer,
- sslState,
- Config.Timeout.read);
- }
- } else if (sslState->client.fd == -1) {
- /* client already closed, nothing more to do */
- } else if (sslState->server.len == 0) {
- comm_close(sslState->client.fd);
- }
-}
/* Read from server side and queue it for writing to the client */
static void
-sslReadServer(int fd, void *data)
+sslReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
- size_t read_sz = SQUID_TCP_SO_RCVBUF - sslState->server.len;
+
assert(fd == sslState->server.fd);
- debug(26, 3) ("sslReadServer: FD %d, reading %d bytes at offset %d\n",
- fd, (int) read_sz, sslState->server.len);
errno = 0;
#if DELAY_POOLS
read_sz = delayBytesWanted(sslState->delayId, 1, read_sz);
#endif
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, sslState->server.buf + sslState->server.len, read_sz);
- debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, len);
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (errcode == COMM_ERR_CLOSING) {
+ return;
+ }
+ debug(26, 3) ("sslReadServer: FD %d, read %d bytes\n", fd, (int)len);
if (len > 0) {
- fd_bytes(fd, len, FD_READ);
#if DELAY_POOLS
delayBytesIn(sslState->delayId, len);
#endif
comm_close(fd);
} else if (len == 0) {
comm_close(sslState->server.fd);
- }
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ /* Only close the remote end if we've finished queueing data to it */
+ if (sslState->server.len == 0 && sslState->client.fd != -1) {
+ comm_close(sslState->client.fd);
+ }
+ } else if (cbdataReferenceValid(sslState))
+ comm_write(sslState->client.fd, sslState->server.buf, len, sslWriteClientDone, sslState, NULL);
cbdataInternalUnlock(sslState); /* ??? */
}
/* Read from client side and queue it for writing to the server */
static void
-sslReadClient(int fd, void *data)
+sslReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
assert(fd == sslState->client.fd);
- debug(26, 3) ("sslReadClient: FD %d, reading %d bytes at offset %d\n",
- fd, SQUID_TCP_SO_RCVBUF - sslState->client.len,
- sslState->client.len);
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd,
- sslState->client.buf + sslState->client.len,
- SQUID_TCP_SO_RCVBUF - sslState->client.len);
- debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, len);
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (errcode == COMM_ERR_CLOSING) {
+ return;
+ }
+
+ debug(26, 3) ("sslReadClient: FD %d, read %d bytes\n", fd, (int) len);
if (len > 0) {
- fd_bytes(fd, len, FD_READ);
kb_incr(&statCounter.client_http.kbytes_in, len);
sslState->client.len += len;
}
if (len < 0) {
int level = 1;
#ifdef ECONNRESET
- if (errno == ECONNRESET)
+ if (xerrno == ECONNRESET)
level = 2;
#endif
- if (ignoreErrno(errno))
+ if (ignoreErrno(xerrno))
level = 3;
+ /* XXX xstrerror() should be changed to take errno as an arg! */
+ errno = xerrno;
debug(50, level) ("sslReadClient: FD %d: read failure: %s\n",
fd, xstrerror());
- if (!ignoreErrno(errno))
+ if (!ignoreErrno(xerrno))
comm_close(fd);
} else if (len == 0) {
- comm_close(fd);
- }
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ comm_close(sslState->client.fd);
+ /* Only close the remote end if we've finished queueing data to it */
+ if (sslState->client.len == 0 && sslState->server.fd != -1) {
+ comm_close(sslState->server.fd);
+ }
+ } else if (cbdataReferenceValid(sslState))
+ comm_write(sslState->server.fd, sslState->client.buf, len, sslWriteServerDone, sslState, NULL);
cbdataInternalUnlock(sslState); /* ??? */
}
/* Writes data from the client buffer to the server side */
static void
-sslWriteServer(int fd, void *data)
+sslWriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
assert(fd == sslState->server.fd);
- debug(26, 3) ("sslWriteServer: FD %d, %d bytes to write\n",
- fd, sslState->client.len);
- statCounter.syscalls.sock.writes++;
- len = FD_WRITE_METHOD(fd,
- sslState->client.buf,
- sslState->client.len);
- debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, len);
+ debug(26, 3) ("sslWriteServer: FD %d, %d bytes written\n", fd, (int)len);
+ /* Valid data */
if (len > 0) {
- fd_bytes(fd, len, FD_WRITE);
kb_incr(&statCounter.server.all.kbytes_out, len);
kb_incr(&statCounter.server.other.kbytes_out, len);
- assert(len <= sslState->client.len);
- sslState->client.len -= len;
- if (sslState->client.len > 0) {
- /* we didn't write the whole thing */
- xmemmove(sslState->client.buf,
- sslState->client.buf + len,
- sslState->client.len);
- }
+ assert(len == (size_t)sslState->client.len);
+ sslState->client.len = 0;
+ }
+ /* EOF */
+ if (len == 0) {
+ comm_close(sslState->server.fd);
+ return;
+ }
+ /* If the other end has closed, so should we */
+ if (sslState->client.fd == -1) {
+ comm_close(sslState->server.fd);
+ return;
}
cbdataInternalLock(sslState); /* ??? should be locked by the caller... */
+ /* Error? */
if (len < 0) {
debug(50, ignoreErrno(errno) ? 3 : 1)
("sslWriteServer: FD %d: write failure: %s.\n", fd, xstrerror());
if (!ignoreErrno(errno))
comm_close(fd);
}
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->client.len == 0);
+ comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadClient, sslState);
+ }
cbdataInternalUnlock(sslState); /* ??? */
}
/* Writes data from the server buffer to the client side */
static void
-sslWriteClient(int fd, void *data)
+sslWriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, void *data)
{
SslStateData *sslState = (SslStateData *)data;
- int len;
assert(fd == sslState->client.fd);
- debug(26, 3) ("sslWriteClient: FD %d, %d bytes to write\n",
- fd, sslState->server.len);
- statCounter.syscalls.sock.writes++;
- len = FD_WRITE_METHOD(fd,
- sslState->server.buf,
- sslState->server.len);
- debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, len);
+ debug(26, 3) ("sslWriteClient: FD %d, %d bytes written\n", fd, (int)len);
if (len > 0) {
- fd_bytes(fd, len, FD_WRITE);
kb_incr(&statCounter.client_http.kbytes_out, len);
- assert(len <= sslState->server.len);
- sslState->server.len -= len;
+ assert(len == (size_t)sslState->server.len);
+ sslState->server.len =0;
/* increment total object size */
if (sslState->size_ptr)
*sslState->size_ptr += len;
- if (sslState->server.len > 0) {
- /* we didn't write the whole thing */
- xmemmove(sslState->server.buf,
- sslState->server.buf + len,
- sslState->server.len);
- }
+ }
+ /* EOF */
+ if (len == 0) {
+ comm_close(sslState->client.fd);
+ return;
+ }
+ /* If the other end has closed, so should we */
+ if (sslState->server.fd == -1) {
+ comm_close(sslState->client.fd);
+ return;
}
cbdataInternalLock(sslState); /* ??? should be locked by the caller... */
+ /* Error? */
if (len < 0) {
debug(50, ignoreErrno(errno) ? 3 : 1)
("sslWriteClient: FD %d: write failure: %s.\n", fd, xstrerror());
if (!ignoreErrno(errno))
comm_close(fd);
}
- if (cbdataReferenceValid(sslState))
- sslSetSelect(sslState);
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->server.len == 0);
+ comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadServer, sslState);
+ }
cbdataInternalUnlock(sslState); /* ??? */
}
comm_close(sslState->server.fd);
}
+static void
+sslConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data)
+{
+ SslStateData *sslState = (SslStateData *)data;
+ if (flag != COMM_OK) {
+ sslErrorComplete(fd, data, 0);
+ return;
+ }
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->server.len == 0);
+ comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadServer, sslState);
+ comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadClient, sslState);
+ }
+}
+
+
+/*
+ * handle the write completion from a proxy request to an upstream proxy
+ */
+static void
+sslProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, void *data)
+{
+ SslStateData *sslState = (SslStateData *)data;
+ if (flag != COMM_OK) {
+ sslErrorComplete(fd, data, 0);
+ return;
+ }
+ if (cbdataReferenceValid(sslState)) {
+ assert(sslState->server.len == 0);
+ comm_read(sslState->server.fd, sslState->server.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadServer, sslState);
+ comm_read(sslState->client.fd, sslState->client.buf, SQUID_TCP_SO_RCVBUF,
+ sslReadClient, sslState);
+ }
+}
+
static void
sslConnected(int fd, void *data)
{
SslStateData *sslState = (SslStateData *)data;
debug(26, 3) ("sslConnected: FD %d sslState=%p\n", fd, sslState);
*sslState->status_ptr = HTTP_OK;
- xstrncpy(sslState->server.buf, conn_established, SQUID_TCP_SO_RCVBUF);
- sslState->server.len = strlen(conn_established);
- sslSetSelect(sslState);
+ comm_write(sslState->client.fd, conn_established, strlen(conn_established),
+ sslConnectedWriteDone, sslState, NULL);
}
static void
} else {
if (sslState->servers->_peer)
sslProxyConnected(sslState->server.fd, sslState);
- else
+ else {
sslConnected(sslState->server.fd, sslState);
+ }
commSetTimeout(sslState->server.fd,
Config.Timeout.read,
sslTimeout,
httpHeaderClean(&hdr_out);
packerClean(&p);
memBufAppend(&mb, "\r\n", 2);
- xstrncpy(sslState->client.buf, mb.buf, SQUID_TCP_SO_RCVBUF);
- debug(26, 3) ("sslProxyConnected: Sending {%s}\n", sslState->client.buf);
- sslState->client.len = mb.size;
- memBufClean(&mb);
+
+ comm_write_mbuf(sslState->server.fd, mb, sslProxyConnectedWriteDone, sslState);
+
commSetTimeout(sslState->server.fd,
Config.Timeout.read,
sslTimeout,
sslState);
- sslSetSelect(sslState);
}
static void
/*
- * $Id: typedefs.h,v 1.139 2002/10/13 20:35:06 robertc Exp $
+ * $Id: typedefs.h,v 1.140 2002/10/14 08:16:59 robertc Exp $
*
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
typedef void CWCB(int fd, char *, size_t size, comm_err_t flag, void *data);
typedef void CNCB(int fd, comm_err_t status, void *);
+typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data);
+typedef void IOACB(int fd, int nfd, struct sockaddr_in *me, struct sockaddr_in
+ *pn, comm_err_t flag, int xerrno, void *data);
typedef void FREE(void *);
typedef void CBDUNL(void *);
/*
- * $Id: urn.cc,v 1.76 2002/10/13 20:35:06 robertc Exp $
+ * $Id: urn.cc,v 1.77 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 52 URN Parsing
* AUTHOR: Kostas Anagnostakis
char *buf = urnState->reqbuf;
StoreIOBuffer tempBuffer;
- debug(52, 3) ("urnHandleReply: Called with size=%u.\n", result.length);
+ debug(52, 3) ("urnHandleReply: Called with size=%u.\n", (unsigned int)result.length);
if (EBIT_TEST(urlres_e->flags, ENTRY_ABORTED)) {
goto error;
}
/*
- * $Id: wais.cc,v 1.141 2002/10/14 07:35:46 hno Exp $
+ * $Id: wais.cc,v 1.142 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 24 WAIS Relay
* AUTHOR: Harvest Derived
char url[MAX_URL];
request_t *request;
FwdState *fwd;
+ char buf[BUFSIZ];
} WaisStateData;
static PF waisStateFree;
static PF waisTimeout;
-static PF waisReadReply;
+static IOCB waisReadReply;
static CWCB waisSendComplete;
static PF waisSendRequest;
/* This will be called when data is ready to be read from fd. Read until
* error or connection closed. */
static void
-waisReadReply(int fd, void *data)
+waisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
WaisStateData *waisState = (WaisStateData *)data;
- LOCAL_ARRAY(char, buf, 4096);
StoreEntry *entry = waisState->entry;
- int len;
int clen;
int bin;
size_t read_sz;
#if DELAY_POOLS
delay_id delayId = delayMostBytesAllowed(entry->mem_obj);
#endif
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
comm_close(fd);
return;
}
errno = 0;
- read_sz = 4096;
-#if DELAY_POOLS
- read_sz = delayBytesWanted(delayId, 1, read_sz);
-#endif
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, buf, read_sz);
- if (len > 0) {
- fd_bytes(fd, len, FD_READ);
+ read_sz = BUFSIZ;
+ if (flag == COMM_OK && len > 0) {
#if DELAY_POOLS
delayBytesIn(delayId, len);
#endif
kb_incr(&statCounter.server.all.kbytes_in, len);
kb_incr(&statCounter.server.other.kbytes_in, len);
}
- debug(24, 5) ("waisReadReply: FD %d read len:%d\n", fd, len);
- if (len > 0) {
+#if DELAY_POOLS
+ read_sz = delayBytesWanted(delay_id, 1, read_sz);
+#endif
+ debug(24, 5) ("waisReadReply: FD %d read len:%d\n", fd, (int)len);
+ if (flag == COMM_OK && len > 0) {
commSetTimeout(fd, Config.Timeout.read, NULL, NULL);
IOStats.Wais.reads++;
for (clen = len - 1, bin = 0; clen; bin++)
clen >>= 1;
IOStats.Wais.read_hist[bin]++;
}
- if (len < 0) {
+ if (flag != COMM_OK || len < 0) {
debug(50, 1) ("waisReadReply: FD %d: read failure: %s.\n",
fd, xstrerror());
- if (ignoreErrno(errno)) {
+ if (ignoreErrno(xerrno)) {
/* reinstall handlers */
/* XXX This may loop forever */
- commSetSelect(fd, COMM_SELECT_READ,
- waisReadReply, waisState, 0);
+ comm_read(fd, waisState->buf, read_sz, waisReadReply, waisState);
} else {
ErrorState *err;
EBIT_CLR(entry->flags, ENTRY_CACHABLE);
errorAppendEntry(entry, err);
comm_close(fd);
}
- } else if (len == 0 && entry->mem_obj->inmem_hi == 0) {
+ } else if (flag == COMM_OK && len == 0 && entry->mem_obj->inmem_hi == 0) {
ErrorState *err;
err = errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE);
err->xerrno = errno;
err->request = requestLink(waisState->request);
errorAppendEntry(entry, err);
comm_close(fd);
- } else if (len == 0) {
+ } else if (flag == COMM_OK && len == 0) {
/* Connection closed; retrieval done. */
entry->expires = squid_curtime;
fwdComplete(waisState->fwd);
comm_close(fd);
} else {
storeAppend(entry, buf, len);
- commSetSelect(fd,
- COMM_SELECT_READ,
- waisReadReply,
- waisState, 0);
+ comm_read(fd, waisState->buf, read_sz, waisReadReply, waisState);
}
}
comm_close(fd);
} else {
/* Schedule read reply. */
- commSetSelect(fd,
- COMM_SELECT_READ,
- waisReadReply,
- waisState, 0);
+ comm_read(fd, waisState->buf, BUFSIZ, waisReadReply, waisState);
commSetDefer(fd, fwdCheckDeferRead, entry);
}
}
waisState->fwd = fwd;
comm_add_close_handler(waisState->fd, waisStateFree, waisState);
storeLockObject(entry);
- commSetSelect(fd, COMM_SELECT_WRITE, waisSendRequest, waisState, 0);
commSetTimeout(fd, Config.Timeout.read, waisTimeout, waisState);
+ waisSendRequest(fd, waisState);
}
/*
- * $Id: whois.cc,v 1.19 2002/10/13 20:35:06 robertc Exp $
+ * $Id: whois.cc,v 1.20 2002/10/14 08:16:59 robertc Exp $
*
* DEBUG: section 75 WHOIS protocol
* AUTHOR: Duane Wessels, Kostas Anagnostakis
StoreEntry *entry;
request_t *request;
FwdState *fwd;
+ char buf[BUFSIZ];
} WhoisState;
static PF whoisClose;
static PF whoisTimeout;
-static PF whoisReadReply;
+static IOCB whoisReadReply;
/* PUBLIC */
buf = (char *)xmalloc(l);
snprintf(buf, l, "%s\r\n", strBuf(p->request->urlpath) + 1);
comm_write(fd, buf, strlen(buf), NULL, p, xfree);
- commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, 0);
+ comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p);
commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p);
}
}
static void
-whoisReadReply(int fd, void *data)
+whoisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
WhoisState *p = (WhoisState *)data;
StoreEntry *entry = p->entry;
- char *buf = (char *)memAllocate(MEM_4K_BUF);
MemObject *mem = entry->mem_obj;
- int len;
- statCounter.syscalls.sock.reads++;
- len = FD_READ_METHOD(fd, buf, 4095);
+ int do_next_read = 0;
+
+ /* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
+ if (flag == COMM_ERR_CLOSING) {
+ return;
+ }
+
buf[len] = '\0';
- debug(75, 3) ("whoisReadReply: FD %d read %d bytes\n", fd, len);
+ debug(75, 3) ("whoisReadReply: FD %d read %d bytes\n", fd, (int)len);
debug(75, 5) ("{%s}\n", buf);
- if (len > 0) {
+ if (flag == COMM_OK && len > 0) {
if (0 == mem->inmem_hi)
mem->reply->sline.status = HTTP_OK;
- fd_bytes(fd, len, FD_READ);
kb_incr(&statCounter.server.all.kbytes_in, len);
kb_incr(&statCounter.server.http.kbytes_in, len);
storeAppend(entry, buf, len);
- commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, Config.Timeout.read);
- } else if (len < 0) {
+ do_next_read = 1;
+ } else if (flag != COMM_OK || len < 0) {
debug(50, 2) ("whoisReadReply: FD %d: read failure: %s.\n",
fd, xstrerror());
if (ignoreErrno(errno)) {
- commSetSelect(fd, COMM_SELECT_READ, whoisReadReply, p, Config.Timeout.read);
+ do_next_read = 1;
} else if (mem->inmem_hi == 0) {
ErrorState *err;
err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR);
err->xerrno = errno;
fwdFail(p->fwd, err);
comm_close(fd);
+ do_next_read = 0;
} else {
comm_close(fd);
+ do_next_read = 0;
}
} else {
storeTimestampsSet(entry);
fwdComplete(p->fwd);
debug(75, 3) ("whoisReadReply: Done: %s\n", storeUrl(entry));
comm_close(fd);
+ do_next_read = 0;
}
- memFree(buf, MEM_4K_BUF);
+ if (do_next_read)
+ comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p);
}
static void