From: Amos Jeffries Date: Sat, 27 Nov 2010 14:02:42 +0000 (+1300) Subject: Merge from trunk. and Save Comm::Connection in IoCallback X-Git-Tag: take08~55^2~124^2~32 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=b038892;p=thirdparty%2Fsquid.git Merge from trunk. and Save Comm::Connection in IoCallback --- b0388924befe5c7c937f4e2d7644665834db7c01 diff --cc doc/release-notes/release-3.2.sgml index eaeaca3405,91d0c23809..0ec2c74f96 --- a/doc/release-notes/release-3.2.sgml +++ b/doc/release-notes/release-3.2.sgml @@@ -514,10 -538,9 +542,13 @@@ This section gives a thorough account o ignore_expect_100

Obsolete. + log_fqdn +

Obsolete. Replaced by automatic detection of the %>A logformat tag. + + maximum_single_addr_tries +

The behaviour controlled by this directive is no longer possible. + It has been replaced by connect_retries option which operates a little differently. + url_rewrite_concurrency

Replaced by url_rewrite_children ... concurrency=N option. diff --cc src/ProtoPort.cc index f0b943800a,98d4142a62..91064d155e --- a/src/ProtoPort.cc +++ b/src/ProtoPort.cc @@@ -1,5 -1,12 +1,8 @@@ -/* - * $Id$ - */ - #include "squid.h" #include "ProtoPort.h" + #if HAVE_LIMITS + #include + #endif http_port_list::http_port_list(const char *aProtocol) #if USE_SSL diff --cc src/ProtoPort.h index 2d349a6ed0,6bee84f578..98235048b3 --- a/src/ProtoPort.h +++ b/src/ProtoPort.h @@@ -1,9 -1,17 +1,13 @@@ #ifndef SQUID_PROTO_PORT_H #define SQUID_PROTO_PORT_H -//#include "typedefs.h" #include "cbdata.h" -#include "comm/ListenStateData.h" +#include "comm/Connection.h" + #if USE_SSL + #include "ssl/gadgets.h" + #endif + struct http_port_list { http_port_list(const char *aProtocol); ~http_port_list(); diff --cc src/Server.cc index 0df54be262,8256a1f77a..dbee8aca6d --- a/src/Server.cc +++ b/src/Server.cc @@@ -34,11 -34,10 +34,12 @@@ #include "squid.h" #include "base/TextException.h" +#include "comm/Connection.h" +#include "comm/forward.h" + #include "comm/Write.h" #include "Server.h" #include "Store.h" -#include "fde.h" /* for fd_table[fd].closing */ +//#include "fde.h" /* for fd_table[fd].closing */ #include "HttpRequest.h" #include "HttpReply.h" #include "errorpage.h" @@@ -419,8 -424,9 +420,8 @@@ ServerStateData::sendMoreRequestBody( if (getMoreRequestBody(buf) && buf.contentSize() > 0) { debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes"); typedef CommCbMemFunT Dialer; - requestSender = JobCallback(93,3, - Dialer, this, ServerStateData::sentRequestBody); - Comm::Write(fd, &buf, requestSender); + requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody); - comm_write_mbuf(conn, &buf, requestSender); ++ Comm::Write(conn, &buf, requestSender); } else { debugs(9,3, HERE << "will wait for more request body bytes or eof"); requestSender = NULL; diff --cc src/adaptation/icap/Xaction.cc index ec30dd82f9,3e1f3b5d41..04bda5a1d9 --- a/src/adaptation/icap/Xaction.cc +++ b/src/adaptation/icap/Xaction.cc @@@ -4,8 -4,7 +4,9 @@@ #include "squid.h" #include "comm.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" + #include "comm/Write.h" #include "CommCalls.h" #include "HttpMsg.h" #include "adaptation/icap/Xaction.h" @@@ -217,12 -233,12 +218,12 @@@ void Adaptation::Icap::Xaction::dieOnCo void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf) { + Must(haveConnection()); + // comm module will free the buffer typedef CommCbMemFunT Dialer; - writer = JobCallback(93,3, - Dialer, this, Adaptation::Icap::Xaction::noteCommWrote); - + writer = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote); - comm_write_mbuf(connection, &buf, writer); + Comm::Write(connection, &buf, writer); updateTimeout(); } diff --cc src/client_side.cc index 7e4cd6241d,6abeb3e5fc..3fafbef0b9 --- a/src/client_side.cc +++ b/src/client_side.cc @@@ -94,8 -92,10 +94,9 @@@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" +#include "comm/Connection.h" +#include "comm/ConnAcceptor.h" + #include "comm/Write.h" -#include "comm/ListenStateData.h" -#include "base/TextException.h" -#include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" #include "HttpHdrContRange.h" @@@ -362,7 -377,7 +377,7 @@@ ClientSocketContext::writeControlMsg(Ht AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg", CommIoCbPtrFun(&WroteControlMsg, this)); - comm_write_mbuf(clientConn(), mb, call); - Comm::Write(fd(), mb, call); ++ Comm::Write(clientConn(), mb, call); delete mb; } @@@ -947,7 -963,7 +962,7 @@@ ClientSocketContext::sendBody(HttpRepl noteSentBodyBytes (length); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete", CommIoCbPtrFun(clientWriteBodyComplete, this)); - comm_write(clientConn(), bodyData.data, length, call ); - Comm::Write(fd(), bodyData.data, length, call, NULL); ++ Comm::Write(clientConn(), bodyData.data, length, call, NULL); return; } @@@ -962,9 -978,9 +977,9 @@@ /* write */ AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); - comm_write_mbuf(clientConn(), &mb, call); - Comm::Write(fd(), &mb, call); ++ Comm::Write(clientConn(), &mb, call); } else - writeComplete(fd(), NULL, 0, COMM_OK); + writeComplete(clientConn(), NULL, 0, COMM_OK); } /** @@@ -1365,8 -1381,8 +1380,7 @@@ ClientSocketContext::sendStartOfMessage debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete"); AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", CommIoCbPtrFun(clientWriteComplete, this)); - comm_write_mbuf(clientConn(), mb, call); - Comm::Write(fd(), mb, call); -- ++ Comm::Write(clientConn(), mb, call); delete mb; } @@@ -3338,26 -3355,33 +3350,26 @@@ clientNegotiateSSL(int fd, void *data /** handle a new HTTPS connection */ static void -httpsAccept(int sock, int newfd, ConnectionDetail *details, - comm_err_t flag, int xerrno, void *data) +httpsAccept(int sock, const Comm::ConnectionPointer& details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; - SSL_CTX *sslContext = s->sslContext; + SSL_CTX *sslContext = s->staticSslContext.get(); - if (flag != COMM_OK) { - errno = xerrno; - debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno)); - return; - } + assert(flag != COMM_OK); // Acceptor does not call un unless successful. SSL *ssl = NULL; - if (!(ssl = httpsCreate(newfd, details, sslContext))) + if (!(ssl = httpsCreate(details, sslContext))) return; - debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation."); - fd_note(newfd, "client https connect"); - ConnStateData *connState = connStateCreate(details->peer, details->me, - newfd, &s->http); + debugs(33, 5, HERE << details << " accepted, starting SSL negotiation."); + fd_note(details->fd, "client https connect"); + ConnStateData *connState = connStateCreate(details, &s->http); typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(33, 5, - Dialer, connState, ConnStateData::connStateClosed); - comm_add_close_handler(newfd, call); + AsyncCall::Pointer call = JobCallback(33, 5, Dialer, connState, ConnStateData::connStateClosed); + comm_add_close_handler(details->fd, call); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(details->remote, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, @@@ -3385,28 -3409,111 +3397,106 @@@ incoming_sockets_accepted++; } - bool - ConnStateData::switchToHttps() + void + ConnStateData::sslCrtdHandleReplyWrapper(void *data, char *reply) { - assert(!switchedToHttps_); + ConnStateData * state_data = (ConnStateData *)(data); + state_data->sslCrtdHandleReply(reply); + } - //HTTPMSGLOCK(currentobject->http->request); - assert(areAllContextsForThisConnection()); - freeAllContexts(); - //currentobject->connIsFinished(); + void + ConnStateData::sslCrtdHandleReply(const char * reply) + { + if (!reply) { + debugs(1, 1, HERE << "\"ssl_crtd\" helper return reply"); + } else { + Ssl::CrtdMessage reply_message; + if (reply_message.parse(reply, strlen(reply)) != Ssl::CrtdMessage::OK) { + debugs(33, 5, HERE << "Reply from ssl_crtd for " << sslHostName << " is incorrect"); + } else { + if (reply_message.getCode() != "ok") { + debugs(33, 5, HERE << "Certificate for " << sslHostName << " cannot be generated. ssl_crtd response: " << reply_message.getBody()); + } else { + debugs(33, 5, HERE << "Certificate for " << sslHostName << " was successfully recieved from ssl_crtd"); + getSslContextDone(Ssl::generateSslContextUsingPkeyAndCertFromMemory(reply_message.getBody().c_str()), true); + return; + } + } + } + getSslContextDone(NULL); + } - debugs(33, 5, HERE << "converting " << clientConn << " to SSL"); + bool + ConnStateData::getSslContextStart() + { + char const * host = sslHostName.termedBuf(); + if (port->generateHostCertificates && host && strcmp(host, "") != 0) { + debugs(33, 5, HERE << "Finding SSL certificate for " << host << " in cache"); + Ssl::LocalContextStorage & ssl_ctx_cache(Ssl::TheGlobalContextStorage.getLocalStorage(port->s)); + SSL_CTX * dynCtx = ssl_ctx_cache.find(host); + if (dynCtx) { + debugs(33, 5, HERE << "SSL certificate for " << host << " have found in cache"); + if (Ssl::verifySslCertificateDate(dynCtx)) { + debugs(33, 5, HERE << "Cached SSL certificate for " << host << " is valid"); + return getSslContextDone(dynCtx); + } else { + debugs(33, 5, HERE << "Cached SSL certificate for " << host << " is out of date. Delete this certificate from cache"); + ssl_ctx_cache.remove(host); + } + } else { + debugs(33, 5, HERE << "SSL certificate for " << host << " haven't found in cache"); + } - #if 0 // use the actual clientConn now that we have it. - // fake a Comm::Connection object; XXX: make ConnState a Comm::Connection? - Comm::Connection detail; - detail.local = me; - detail.remote = peer; - #endif + #if USE_SSL_CRTD + debugs(33, 5, HERE << "Generating SSL certificate for " << host << " using ssl_crtd."); + Ssl::CrtdMessage request_message; + request_message.setCode(Ssl::CrtdMessage::code_new_certificate); + Ssl::CrtdMessage::BodyParams map; + map.insert(std::make_pair(Ssl::CrtdMessage::param_host, host)); + std::string bufferToWrite; + Ssl::writeCertAndPrivateKeyToMemory(port->signingCert, port->signPkey, bufferToWrite); + request_message.composeBody(map, bufferToWrite); + Ssl::Helper::GetInstance()->sslSubmit(request_message, sslCrtdHandleReplyWrapper, this); + return true; + #else + debugs(33, 5, HERE << "Generating SSL certificate for " << host); + dynCtx = Ssl::generateSslContext(host, port->signingCert, port->signPkey); + return getSslContextDone(dynCtx, true); + #endif //USE_SSL_CRTD + } + return getSslContextDone(NULL); + } + + bool + ConnStateData::getSslContextDone(SSL_CTX * sslContext, bool isNew) + { + // Try to add generated ssl context to storage. + if (port->generateHostCertificates && isNew) { + Ssl::LocalContextStorage & ssl_ctx_cache(Ssl::TheGlobalContextStorage.getLocalStorage(port->s)); + if (sslContext && sslHostName != "") { + if (!ssl_ctx_cache.add(sslHostName.termedBuf(), sslContext)) { + // If it is not in storage delete after using. Else storage deleted it. - fd_table[fd].dynamicSslContext = sslContext; ++ fd_table[clientConn->fd].dynamicSslContext = sslContext; + } + } else { + debugs(33, 2, HERE << "Failed to generate SSL cert for " << sslHostName); + } + } + + // If generated ssl context = NULL, try to use static ssl context. + if (!sslContext) { + if (!port->staticSslContext) { - debugs(83, 1, "Closing SSL FD " << fd << " as lacking SSL context"); - comm_close(fd); ++ debugs(83, 1, "Closing SSL " << clientConn->remote << " as lacking SSL context"); ++ clientConn->close(); + return false; + } else { + debugs(33, 5, HERE << "Using static ssl context."); + sslContext = port->staticSslContext.get(); + } + } - SSL_CTX *sslContext = port->sslContext; - // fake a ConnectionDetail object; XXX: make ConnState a ConnectionDetail? - ConnectionDetail detail; - detail.me = me; - detail.peer = peer; - SSL *ssl = NULL; - if (!(ssl = httpsCreate(fd, &detail, sslContext))) + if (!(ssl = httpsCreate(clientConn, sslContext))) return false; // commSetTimeout() was called for this request before we switched. @@@ -3420,6 -3527,23 +3510,23 @@@ return true; } + bool + ConnStateData::switchToHttps(const char *host) + { + assert(!switchedToHttps_); + + sslHostName = host; + + //HTTPMSGLOCK(currentobject->http->request); + assert(areAllContextsForThisConnection()); + freeAllContexts(); + //currentobject->connIsFinished(); + - debugs(33, 5, HERE << "converting FD " << fd << " to SSL"); ++ debugs(33, 5, HERE << "converting " << clientConn << " to SSL"); + + return getSslContextStart(); + } + #endif /* USE_SSL */ /// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed @@@ -3472,30 -3597,28 +3580,36 @@@ clientHttpConnectionsOpen(void s->http.s << " due to SSL initialization failure."); s->sslBump = 0; } - if (s->sslBump) + if (s->sslBump) { ++bumpCount; + // Create ssl_ctx cache for this port. + Ssl::TheGlobalContextStorage.addLocalStorage(s->s, s->dynamicCertMemCacheSize == std::numeric_limits::max() ? 4194304 : s->dynamicCertMemCacheSize); + } #endif + #if USE_SSL_CRTD + Ssl::Helper::GetInstance(); + #endif //USE_SSL_CRTD - /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */ +// NOTE: would the design here be better if we opened both the ConnAcceptor and IPC informative messages now? +// that way we have at least one worker listening on the socket immediately with others joining in as +// they receive the IPC message. + + // Fill out a Comm::Connection which IPC will open as a listener for us + // then pass back so we can start a ConnAcceptor subscription. + s->listenConn = new Comm::Connection; + s->listenConn->local = s->s; + s->listenConn->flags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0); - const int openFlags = COMM_NONBLOCKING | - (s->spoof_client_ip ? COMM_TRANSPARENT : 0); + // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP + typedef CommCbFunPtrCallT AcceptCall; + RefCount subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s)); + Subscription::Pointer sub = new CallSubscription(subCall); - AsyncCall::Pointer callback = asyncCall(33,2, - "clientHttpConnectionOpened", - ListeningStartedDialer(&clientHttpConnectionOpened, s)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, - Ipc::fdnHttpSocket, callback); + AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened", + ListeningStartedDialer(&clientListenerConnectionOpened, s, false)); + Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpSocket, listenCall, sub); - HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened + HttpSockets[NHttpSockets++] = -1; // set in clientListenerHttpConnectionOpened } #if USE_SSL @@@ -3698,10 -3835,7 +3812,11 @@@ clientAclChecklistCreate(const acl_acce CBDATA_CLASS_INIT(ConnStateData); -ConnStateData::ConnStateData() :AsyncJob("ConnStateData"), transparent_ (false), closing_ (false), switchedToHttps_(false) +ConnStateData::ConnStateData() : + AsyncJob("ConnStateData"), + transparent_(false), - closing_(false) ++ closing_(false), ++ switchedToHttps_(false) { pinning.fd = -1; pinning.pinned = false; diff --cc src/client_side_request.cc index 4608f180c0,280828ea1c..067f6a8f04 --- a/src/client_side_request.cc +++ b/src/client_side_request.cc @@@ -59,7 -59,7 +59,8 @@@ #include "client_side_reply.h" #include "client_side_request.h" #include "ClientRequestContext.h" +#include "comm/Connection.h" + #include "comm/Write.h" #include "compat/inet_pton.h" #include "fde.h" #include "HttpReply.h" @@@ -1200,13 -1197,18 +1202,15 @@@ ClientHttpRequest::sslBumpEstablish(com void ClientHttpRequest::sslBumpStart() { - debugs(85, 5, HERE << "ClientHttpRequest::sslBumpStart"); - + debugs(85, 5, HERE << "Confirming CONNECT tunnel on FD " << getConn()->clientConn); // send an HTTP 200 response to kick client SSL negotiation - const int fd = getConn()->fd; - debugs(33, 7, HERE << "Confirming CONNECT tunnel on FD " << fd); + debugs(33, 7, HERE << "Confirming CONNECT tunnel on FD " << getConn()->clientConn); // TODO: Unify with tunnel.cc and add a Server(?) header - static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n"; - comm_write(getConn()->clientConn, conn_established, strlen(conn_established), &SslBumpEstablish, this, NULL); - static const char *const conn_established = - "HTTP/1.1 200 Connection established\r\n\r\n"; ++ static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; + AsyncCall::Pointer call = commCbCall(85, 5, "ClientSocketContext::sslBumpEstablish", + CommIoCbPtrFun(&SslBumpEstablish, this)); - Comm::Write(fd, conn_established, strlen(conn_established), call, NULL); ++ Comm::Write(getConn()->clientConn, conn_established, strlen(conn_established), call, NULL); } #endif diff --cc src/comm.cc index 3365de51b5,fc7e131f3f..0738f080d5 --- a/src/comm.cc +++ b/src/comm.cc @@@ -40,9 -39,12 +40,11 @@@ #include "fde.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" +#include "comm/Connection.h" + #include "comm/IoCallback.h" + #include "comm/Write.h" -#include "comm/ListenStateData.h" #include "CommIO.h" #include "CommRead.h" -#include "ConnectionDetail.h" #include "MemBuf.h" #include "pconn.h" #include "SquidTime.h" @@@ -67,15 -69,9 +69,9 @@@ * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything. */ - typedef enum { - IOCB_NONE, - IOCB_READ, - IOCB_WRITE - } iocb_type; - static void commStopHalfClosedMonitor(int fd); static IOCB commHalfClosedReader; -static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); +static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI); static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI); #if DELAY_POOLS @@@ -84,142 -80,38 +80,6 @@@ CBDATA_CLASS_INIT(CommQuotaQueue) static void commHandleWriteHelper(void * data); #endif - static void commSelectOrQueueWrite(const int fd); - - struct comm_io_callback_t { - iocb_type type; - int fd; - AsyncCall::Pointer callback; - char *buf; - FREE *freefunc; - int size; - int offset; - comm_err_t errcode; - int xerrno; - #if DELAY_POOLS - unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue - #endif - - - bool active() const { return callback != NULL; } - }; - - struct _comm_fd { - int fd; - comm_io_callback_t readcb; - comm_io_callback_t writecb; - }; - typedef struct _comm_fd comm_fd_t; - comm_fd_t *commfd_table; - - // TODO: make this a comm_io_callback_t method? - bool - commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb) - { - assert(ccb->fd == fd); - assert(ccb->type == type); - return ccb->active(); - } - - /* - * Configure comm_io_callback_t for I/O - * - * @param fd filedescriptor - * @param ccb comm io callback - * @param cb callback - * @param cbdata callback data (must be cbdata'ed) - * @param buf buffer, if applicable - * @param freefunc freefunc, if applicable - * @param size buffer size - */ - static void - commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb, - AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size) - { - assert(!ccb->active()); - assert(ccb->type == type); - assert(cb != NULL); - ccb->fd = fd; - ccb->callback = cb; - ccb->buf = buf; - ccb->freefunc = freefunc; - ccb->size = size; - ccb->offset = 0; - } - - - // Schedule the callback call and clear the callback - static void - commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno) -class ConnectStateData --{ - debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" << - code << ", " << xerrno << ")"); - assert(ccb->active()); - assert(ccb->fd == fd); - ccb->errcode = code; - ccb->xerrno = xerrno; - - #if DELAY_POOLS - ccb->quotaQueueReserv = 0; - #endif - - comm_io_callback_t cb = *ccb; - - /* We've got a copy; blow away the real one */ - /* XXX duplicate code from commio_cancel_callback! */ - ccb->xerrno = 0; - ccb->callback = NULL; // cb has it - - /* free data */ - if (cb.freefunc) { - cb.freefunc(cb.buf); - cb.buf = NULL; - } - - if (cb.callback != NULL) { - typedef CommIoCbParams Params; - Params ¶ms = GetCommParams(cb.callback); - params.fd = cb.fd; - params.buf = cb.buf; - params.size = cb.offset; - params.flag = cb.errcode; - params.xerrno = cb.xerrno; - ScheduleCallHere(cb.callback); - } - } -- -public: - void *operator new (size_t); - void operator delete (void *); - static void Connect (int fd, void *me); - void connect(); - void callCallback(comm_err_t status, int xerrno); - void defaults(); -- - /* - * Cancel the given callback - * - * Remember that the data is cbdataRef'ed. - */ - // TODO: make this a comm_io_callback_t method - static void - commio_cancel_callback(int fd, comm_io_callback_t *ccb) - { - debugs(5, 3, "commio_cancel_callback: called for FD " << fd); - assert(ccb->fd == fd); - assert(ccb->active()); -// defaults given by client - char *host; - u_short default_port; - Ip::Address default_addr; - // NP: CANNOT store the default addr:port together as it gets set/reset differently. -- - ccb->xerrno = 0; - ccb->callback = NULL; - DnsLookupDetails dns; ///< host lookup details - Ip::Address S; - AsyncCall::Pointer callback; -- - #if DELAY_POOLS - ccb->quotaQueueReserv = 0; - #endif - } - int fd; - int tries; - int addrcount; - int connstart; -- - /* - * Call the given comm callback; assumes the callback is valid. - * - * @param ccb io completion callback - */ - void - commio_call_callback(comm_io_callback_t *ccb) - { - } -private: - int commResetFD(); - int commRetryConnect(); - CBDATA_CLASS(ConnectStateData); -}; -- /* STATIC */ static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs @@@ -234,12 -126,13 +94,13 @@@ static void commSetNoLinger(int) static void commSetTcpNoDelay(int); #endif static void commSetTcpRcvbuf(int, int); - static PF commHandleWrite; -static PF commConnectFree; -static IPH commConnectDnsHandle; ++/* typedef enum { COMM_CB_READ = 1, COMM_CB_DERIVED } comm_callback_t; ++*/ static MemAllocator *conn_close_pool = NULL; fd_debug_t *fdd_table = NULL; @@@ -303,27 -196,27 +164,28 @@@ comm_read(const Comm::ConnectionPointe } void -comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback) +comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback) { - debugs(5, 5, "comm_read, queueing read for FD " << fd << "; asynCall " << callback); + debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback); /* Make sure we are open and not closing */ - assert(isOpen(fd)); - assert(!fd_table[fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_READCB(fd); + assert(Comm::IsConnOpen(conn)); + assert(!fd_table[conn->fd].closing()); - comm_io_callback_t *ccb = COMMIO_FD_READCB(conn->fd); ++ Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd); // Make sure we are either not reading or just passively monitoring. // Active/passive conflicts are OK and simply cancel passive monitoring. if (ccb->active()) { // if the assertion below fails, we have an active comm_read conflict - assert(fd_table[fd].halfClosedReader != NULL); - commStopHalfClosedMonitor(fd); + assert(fd_table[conn->fd].halfClosedReader != NULL); + commStopHalfClosedMonitor(conn->fd); assert(!ccb->active()); } ++ ccb->conn = conn; /* Queue the read */ - commio_set_callback(conn->fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size); + ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); - commSetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0); + commSetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); } /** diff --cc src/comm.h index d2179c3ff4,ed3217c8b8..2c76b5d974 --- a/src/comm.h +++ b/src/comm.h @@@ -4,16 -4,18 +4,11 @@@ #include "squid.h" #include "AsyncEngine.h" #include "base/AsyncCall.h" +#include "CommCalls.h" - #include "comm/comm_err_t.h" - #include "comm/forward.h" - #include "ip/Address.h" + #include "comm_err_t.h" + #include "comm/IoCallback.h" #include "StoreIOBuffer.h" -#include "Array.h" -#include "ip/Address.h" - -class DnsLookupDetails; -typedef void CNCB(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data); - -typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data); - - #define COMMIO_FD_READCB(fd) (&commfd_table[(fd)].readcb) - #define COMMIO_FD_WRITECB(fd) (&commfd_table[(fd)].writecb) - - /* comm.c */ extern bool comm_iocallbackpending(void); /* inline candidate */ @@@ -63,20 -65,9 +58,16 @@@ SQUIDCEXTERN void commSetSelect(int, un SQUIDCEXTERN void commResetSelect(int); SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen); - extern void comm_write(const Comm::ConnectionPointer &conn, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func); - extern void comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL); - SQUIDCEXTERN void comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, IOCB * handler, void *handler_data); - extern void comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback); SQUIDCEXTERN void commCallCloseHandlers(int fd); SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *); -extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback); +extern int commSetTimeout(int fd, int, AsyncCall::Pointer &callback); + +/** + * Set or clear the timeout for some action on an active connection. + * API to replace commSetTimeout() when a Comm::ConnectionPointer is available. + */ +extern int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback); + SQUIDCEXTERN int ignoreErrno(int); SQUIDCEXTERN void commCloseAllSockets(void); SQUIDCEXTERN void checkTimeouts(void); diff --cc src/comm/ConnAcceptor.h index 2564ae78a8,0000000000..4f8652d7fe mode 100644,000000..100644 --- a/src/comm/ConnAcceptor.h +++ b/src/comm/ConnAcceptor.h @@@ -1,90 -1,0 +1,90 @@@ +#ifndef SQUID_COMM_CONNACCEPTOR_H +#define SQUID_COMM_CONNACCEPTOR_H + - #include "config.h" ++#include "base/AsyncCall.h" +#include "base/Subscription.h" +#include "CommCalls.h" - #include "comm/comm_err_t.h" ++#include "comm_err_t.h" +#include "comm/forward.h" + +#if HAVE_MAP +#include +#endif + +namespace Comm +{ + +class AcceptLimiter; + +/** + * Listens on a Comm::Connection for new incoming connections and + * emits an active Comm::Connection descriptor for the new client. + * + * Handles all event limiting required to quash inbound connection + * floods within the global FD limits of available Squid_MaxFD and + * client_ip_max_connections. + * + * Fills the emitted connection with all connection details able to + * be looked up. Currently these are the local/remote IP:port details + * and the listening socket transparent-mode flag. + */ +class ConnAcceptor : public AsyncJob +{ +private: + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); + +public: + ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub); + ConnAcceptor(const ConnAcceptor &r); // not implemented. + + /** Subscribe a handler to receive calls back about new connections. + * Replaces any existing subscribed handler. + */ + void subscribe(const Subscription::Pointer &aSub); + + /** Remove the currently waiting callback subscription. + * Pending calls will remain scheduled. + */ + void unsubscribe(const char *reason); + + /** Try and accept another connection (synchronous). + * If one is pending already the subscribed callback handler will be scheduled + * to handle it before this method returns. + */ + void acceptNext(); + + /// Call the subscribed callback handler with details about a new connection. + void notify(comm_err_t flag, const Comm::ConnectionPointer &details); + + /// errno code of the last accept() or listen() action if one occurred. + int errcode; + +private: + friend class AcceptLimiter; + int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. + Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. + + /// conn being listened on for new connections + /// Reserved for read-only use. + ConnectionPointer conn; + +private: + /// Method to test if there are enough file descriptors to open a new client connection + /// if not the accept() will be postponed + static bool okToAccept(); + + /// Method callback for whenever an FD is ready to accept a client connection. + static void doAccept(int fd, void *data); + + void acceptOne(); + comm_err_t oldAccept(Comm::ConnectionPointer &details); + void setListen(); + + CBDATA_CLASS2(ConnAcceptor); +}; + - }; // namespace Comm ++} // namespace Comm + +#endif /* SQUID_COMM_CONNACCEPTOR_H */ diff --cc src/comm/ConnOpener.cc index 62218b83a2,0000000000..c67f122dc0 mode 100644,000000..100644 --- a/src/comm/ConnOpener.cc +++ b/src/comm/ConnOpener.cc @@@ -1,308 -1,0 +1,316 @@@ +/* + * DEBUG: section 05 Socket Connection Opener + */ + +#include "config.h" +//#include "base/TextException.h" +#include "comm/ConnOpener.h" +#include "comm/Connection.h" +#include "comm.h" +#include "fde.h" +#include "icmp/net_db.h" +#include "SquidTime.h" + +namespace Comm { + CBDATA_CLASS_INIT(ConnOpener); +}; + +Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) : + AsyncJob("Comm::ConnOpener"), + host_(NULL), + conn_(c), + callback_(handler), + totalTries_(0), + failRetries_(0), + connectTimeout_(ctimeout), + connectStart_(0) +{} + +Comm::ConnOpener::~ConnOpener() +{ + safe_free(host_); +} + +bool +Comm::ConnOpener::doneAll() const +{ + // is the conn_ to be opened still waiting? + if (conn_ != NULL) { + return false; + } + + // is the callback still to be called? + if (callback_ != NULL) { + return false; + } + + return AsyncJob::doneAll(); +} + +void +Comm::ConnOpener::swanSong() +{ + // cancel any event watchers + // done here to get the "swanSong" mention in cancel debugging. + if (calls_.earlyAbort_ != NULL) { + calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong"); + calls_.earlyAbort_ = NULL; + } + if (calls_.timeout_ != NULL) { + calls_.timeout_->cancel("Comm::ConnOpener::swanSong"); + calls_.timeout_ = NULL; + } + + // rollback what we can from the job state + if (conn_ != NULL && conn_->isOpen()) { + // drop any handlers now to save a lot of cycles later + commSetSelect(conn_->fd, COMM_SELECT_WRITE, NULL, NULL, 0); + commSetTimeout(conn_->fd, -1, NULL, NULL); + // it never reached fully open, so abort the FD + conn_->close(); + } + + if (callback_ != NULL) { + // inform the still-waiting caller we are dying + doneConnecting(COMM_ERR_CONNECT, 0); + } + + AsyncJob::swanSong(); +} + +void +Comm::ConnOpener::setHost(const char * new_host) +{ + // unset and erase if already set. + if (host_ != NULL) + safe_free(host_); + + // set the new one if given. + if (new_host != NULL) + host_ = xstrdup(new_host); +} + +const char * +Comm::ConnOpener::getHost() const +{ + return host_; +} + +/** + * Connection attempt are completed. One way or the other. + * Pass the results back to the external handler. + */ +void +Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno) +{ + if (callback_ != NULL) { + typedef CommConnectCbParams Params; + Params ¶ms = GetCommParams(callback_); + params.conn = conn_; + params.flag = status; + params.xerrno = xerrno; + ScheduleCallHere(callback_); + callback_ = NULL; + } + + /* ensure cleared local state, we are done. */ + conn_ = NULL; +} + +void +Comm::ConnOpener::start() +{ + Must(conn_ != NULL); + + /* get a socket open ready for connecting with */ + if (!conn_->isOpen()) { +#if USE_IPV6 + /* outbound sockets have no need to be protocol agnostic. */ + if (conn_->remote.IsIPv4()) { + conn_->local.SetIPv4(); + } +#endif + conn_->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_); + if (!conn_->isOpen()) { + doneConnecting(COMM_ERR_CONNECT, 0); + return; + } + } + + typedef CommCbMemFunT abortDialer; + calls_.earlyAbort_ = JobCallback(5, 4, abortDialer, this, Comm::ConnOpener::earlyAbort); + comm_add_close_handler(conn_->fd, calls_.earlyAbort_); + + typedef CommCbMemFunT timeoutDialer; + calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout); + debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_); + commSetTimeout(conn_->fd, connectTimeout_, calls_.timeout_); + + connectStart_ = squid_curtime; + connect(); +} + +void +Comm::ConnOpener::connected() +{ + /* + * stats.conn_open is used to account for the number of + * connections that we have open to the peer, so we can limit + * based on the max-conn option. We need to increment here, + * even if the connection may fail. + */ + if (conn_->getPeer()) + conn_->getPeer()->stats.conn_open++; + + lookupLocalAddress(); + + /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to + * indicate the state of a raw fd object being passed around. + * Also, legacy code still depends on comm_local_port() with no access to Comm::Connection + * when those are done comm_local_port can become one of our member functions to do the below. + */ + fd_table[conn_->fd].flags.open = 1; + fd_table[conn_->fd].local_addr = conn_->local; +} + +/** Make an FD connection attempt. + * Handles the case(s) when a partially setup connection gets closed early. + */ +void +Comm::ConnOpener::connect() +{ + Must(conn_ != NULL); + + totalTries_++; + + switch (comm_connect_addr(conn_->fd, conn_->remote) ) { + + case COMM_INPROGRESS: + // check for timeout FIRST. + if (squid_curtime - connectStart_ > connectTimeout_) { + debugs(5, 5, HERE << conn_ << ": * - ERR took too long already."); + conn_->close(); + doneConnecting(COMM_TIMEOUT, errno); + return; + } else { + debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS"); + commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, this, 0); + } + break; + + case COMM_OK: + debugs(5, 5, HERE << conn_ << ": COMM_OK - connected"); + + connected(); + + if (host_ != NULL) + ipcacheMarkGoodAddr(host_, conn_->remote); + doneConnecting(COMM_OK, 0); + break; + + default: + debugs(5, 5, HERE << conn_ << ": * - try again"); + failRetries_++; + if (host_ != NULL) + ipcacheMarkBadAddr(host_, conn_->remote); +#if USE_ICMP + if (Config.onoff.test_reachability) + netdbDeleteAddrNetwork(conn_->remote); +#endif + + // check for timeout FIRST. + if(squid_curtime - connectStart_ > connectTimeout_) { + debugs(5, 5, HERE << conn_ << ": * - ERR took too long already."); ++ if (calls_.earlyAbort_ != NULL) { ++ calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out"); ++ calls_.earlyAbort_ = NULL; ++ } + conn_->close(); + doneConnecting(COMM_TIMEOUT, errno); + } else if (failRetries_ < Config.connect_retries) { + eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, this, 0.05, 0); + } else { + // send ERROR back to the upper layer. + debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already."); ++ if (calls_.earlyAbort_ != NULL) { ++ calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed"); ++ calls_.earlyAbort_ = NULL; ++ } + conn_->close(); + doneConnecting(COMM_ERR_CONNECT, errno); + } + } +} + +/** + * Lookup local-end address and port of the TCP link just opened. + * This ensure the connection local details are set correctly + */ +void +Comm::ConnOpener::lookupLocalAddress() +{ + struct addrinfo *addr = NULL; + conn_->local.InitAddrInfo(addr); + + if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) { + debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror()); + conn_->local.FreeAddrInfo(addr); + return; + } + + conn_->local = *addr; + conn_->local.FreeAddrInfo(addr); + debugs(5, 6, HERE << conn_); +} + +/** Abort connection attempt. + * Handles the case(s) when a partially setup connection gets closed early. + */ +void +Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io) +{ + debugs(5, 3, HERE << io.conn); + doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better? +} + +/** + * Handles the case(s) when a partially setup connection gets timed out. + * NP: When commSetTimeout accepts generic CommCommonCbParams this can die. + */ +void +Comm::ConnOpener::timeout(const CommTimeoutCbParams &) +{ + connect(); +} + +/* Legacy Wrapper for the retry event after COMM_INPROGRESS + * XXX: As soon as comm commSetSelect() accepts Async calls we can use a ConnOpener::connect call + */ +void +Comm::ConnOpener::InProgressConnectRetry(int fd, void *data) +{ + ConnOpener *cs = static_cast(data); + assert(cs); + + // Ew. we are now outside the all AsyncJob protections. + // get back inside by scheduling another call... + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect); + ScheduleCallHere(call); +} + +/* Legacy Wrapper for the retry event with small delay after errors. + * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call + */ +void +Comm::ConnOpener::DelayedConnectRetry(void *data) +{ + ConnOpener *cs = static_cast(data); + assert(cs); + + // Ew. we are now outside the all AsyncJob protections. + // get back inside by scheduling another call... + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect); + ScheduleCallHere(call); +} diff --cc src/comm/IoCallback.cc index 0000000000,b7b0d751f5..7d059b1b4b mode 000000,100644..100644 --- a/src/comm/IoCallback.cc +++ b/src/comm/IoCallback.cc @@@ -1,0 -1,125 +1,133 @@@ + #include "config.h" + #include "ClientInfo.h" ++#include "comm/Connection.h" + #include "comm/IoCallback.h" + #include "comm/Write.h" + #include "CommCalls.h" + #include "fde.h" + + Comm::CbEntry *Comm::iocb_table; + + void + Comm::CallbackTableInit() + { + // XXX: convert this to a std::map<> ? + iocb_table = static_cast(xcalloc(Squid_MaxFD, sizeof(CbEntry))); + for (int pos = 0; pos < Squid_MaxFD; pos++) { + iocb_table[pos].fd = pos; - iocb_table[pos].readcb.fd = pos; ++// iocb_table[pos].readcb.fd = pos; + iocb_table[pos].readcb.type = IOCB_READ; - iocb_table[pos].writecb.fd = pos; ++// iocb_table[pos].writecb.fd = pos; + iocb_table[pos].writecb.type = IOCB_WRITE; + } + } + + void + Comm::CallbackTableDestruct() + { ++ // release any Comm::Connections being held. ++ for (int pos = 0; pos < Squid_MaxFD; pos++) { ++ iocb_table[pos].readcb.conn = NULL; ++ iocb_table[pos].writecb.conn = NULL; ++ } + safe_free(iocb_table); + } + + /** + * Configure Comm::Callback for I/O + * + * @param fd filedescriptor + * @param t IO callback type (read or write) + * @param cb callback + * @param buf buffer, if applicable + * @param func freefunc, if applicable + * @param sz buffer size + */ + void + Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz) + { + assert(!active()); + assert(type == t); + assert(cb != NULL); + + callback = cb; + buf = b; + freefunc = f; + size = sz; + offset = 0; + } + + void + Comm::IoCallback::selectOrQueueWrite() + { + #if DELAY_POOLS + // stand in line if there is one - if (ClientInfo *clientInfo = fd_table[fd].clientInfo) { ++ if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) { + if (clientInfo->writeLimitingActive) { - quotaQueueReserv = clientInfo->quotaEnqueue(fd); ++ quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd); + clientInfo->kickQuotaQueue(); + return; + } + } + #endif + - commSetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0); ++ commSetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0); + } + + void + Comm::IoCallback::cancel(const char *reason) + { + if (!active()) + return; + + callback->cancel(reason); + callback = NULL; + reset(); + } + + void + Comm::IoCallback::reset() + { ++ conn = NULL; + if (freefunc) { + freefunc(buf); + buf = NULL; + freefunc = NULL; + } + xerrno = 0; + + #if DELAY_POOLS + quotaQueueReserv = 0; + #endif + } + + // Schedule the callback call and clear the callback + void + Comm::IoCallback::finish(comm_err_t code, int xerrn) + { - debugs(5, 3, HERE << "called for FD " << fd << " (" << code << ", " << xerrno << ")"); ++ debugs(5, 3, HERE << "called for " << conn << " (" << code << ", " << xerrno << ")"); + assert(active()); + + /* free data */ + if (freefunc) { + freefunc(buf); + buf = NULL; + freefunc = NULL; + } + + if (callback != NULL) { + typedef CommIoCbParams Params; + Params ¶ms = GetCommParams(callback); - params.fd = fd; ++ if (conn != NULL) params.fd = conn->fd; // for legacy write handlers... ++ params.conn = conn; + params.buf = buf; + params.size = offset; + params.flag = code; + params.xerrno = xerrn; + ScheduleCallHere(callback); + callback = NULL; + } + + /* Reset for next round. */ + reset(); + } diff --cc src/comm/IoCallback.h index 0000000000,fc404460ef..3a450687f4 mode 000000,100644..100644 --- a/src/comm/IoCallback.h +++ b/src/comm/IoCallback.h @@@ -1,0 -1,70 +1,71 @@@ + #ifndef _SQUID_COMM_IOCALLBACK_H + #define _SQUID_COMM_IOCALLBACK_H + + #include "config.h" + #include "base/AsyncCall.h" + #include "comm_err_t.h" ++#include "comm/forward.h" + + namespace Comm { + + /// Type of IO callbacks the Comm layer deals with. + typedef enum { + IOCB_NONE, + IOCB_READ, + IOCB_WRITE + } iocb_type; + + /// Details about a particular Comm IO callback event. + class IoCallback { + public: + iocb_type type; - int fd; ++ Comm::ConnectionPointer conn; + AsyncCall::Pointer callback; + char *buf; + FREE *freefunc; + int size; + int offset; + comm_err_t errcode; + int xerrno; + #if DELAY_POOLS + unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue + #endif + + bool active() const { return callback != NULL; } + void setCallback(iocb_type type, AsyncCall::Pointer &cb, char *buf, FREE *func, int sz); + + /// called when fd needs to write but may need to wait in line for its quota + void selectOrQueueWrite(); + + /// Actively cancel the given callback + void cancel(const char *reason); + + /// finish the IO operation imediately and schedule the callback with the current state. + void finish(comm_err_t code, int xerrn); + + private: + void reset(); + }; + + /// Entry nodes for the IO callback table: iocb_table + /// Keyed off the FD which the event applies to. + class CbEntry { + public: + int fd; + IoCallback readcb; + IoCallback writecb; + }; + + /// Table of scheduled IO events which have yet to be processed ?? + /// Callbacks which might be scheduled in future are stored in fd_table. + extern CbEntry *iocb_table; + + extern void CallbackTableInit(); + extern void CallbackTableDestruct(); + + #define COMMIO_FD_READCB(fd) (&Comm::iocb_table[(fd)].readcb) + #define COMMIO_FD_WRITECB(fd) (&Comm::iocb_table[(fd)].writecb) + + } // namespace Comm + + #endif /* _SQUID_COMM_IOCALLBACK_H */ diff --cc src/comm/Makefile.am index 2ae8858b46,6a2c3f3871..1a3a7e3571 --- a/src/comm/Makefile.am +++ b/src/comm/Makefile.am @@@ -9,14 -7,12 +7,18 @@@ noinst_LTLIBRARIES = libcomm.l libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ListenStateData.cc \ - ListenStateData.h \ + ConnAcceptor.cc \ + ConnAcceptor.h \ \ + ConnOpener.cc \ + ConnOpener.h \ + \ + Connection.cc \ + Connection.h \ - comm_err_t.h \ - comm_internal.h \ - forward.h ++ forward.h \ + IoCallback.cc \ + IoCallback.h \ + Write.cc \ + Write.h \ + \ + comm_internal.h diff --cc src/comm/Write.cc index 0000000000,a348863de4..f1ed25aeed mode 000000,100644..100644 --- a/src/comm/Write.cc +++ b/src/comm/Write.cc @@@ -1,0 -1,146 +1,148 @@@ + #include "config.h" + #if DELAY_POOLS + #include "ClientInfo.h" + #endif ++#include "comm/Connection.h" + #include "comm/IoCallback.h" + #include "comm/Write.h" + #include "fde.h" + #include "SquidTime.h" + #include "MemBuf.h" + + void -Comm::Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback) ++Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) + { - Comm::Write(fd, mb->buf, mb->size, callback, mb->freeFunc()); ++ Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc()); + } + + void -Comm::Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) ++Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) + { - debugs(5, 5, HERE << "FD " << fd << ": sz " << size << ": asynCall " << callback); ++ debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback); + + /* Make sure we are open, not closing, and not writing */ - assert(fd_table[fd].flags.open); - assert(!fd_table[fd].closing()); - Comm::IoCallback *ccb = COMMIO_FD_WRITECB(fd); ++ assert(fd_table[conn->fd].flags.open); ++ assert(!fd_table[conn->fd].closing()); ++ Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); + assert(!ccb->active()); + - fd_table[fd].writeStart = squid_curtime; ++ fd_table[conn->fd].writeStart = squid_curtime; ++ ccb->conn = conn; + /* Queue the write */ + ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size); + ccb->selectOrQueueWrite(); + } + + /** Write to FD. + * This function is used by the lowest level of IO loop which only has access to FD numbers. - * We have to use the comm iocb_table to map FD numbers to waiting data. ++ * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections. + * Once the write has been concluded we schedule the waiting call with success/fail results. + */ + void + Comm::HandleWrite(int fd, void *data) + { + Comm::IoCallback *state = static_cast(data); + int len = 0; + int nleft; + - assert(state->fd == fd); ++ assert(state->conn != NULL && state->conn->fd == fd); + + PROF_start(commHandleWrite); - debugs(5, 5, HERE << "FD " << state->fd << ": off " << ++ debugs(5, 5, HERE << state->conn << ": off " << + (long int) state->offset << ", sz " << (long int) state->size << "."); + + nleft = state->size - state->offset; + + #if DELAY_POOLS + ClientInfo * clientInfo=fd_table[fd].clientInfo; + + if (clientInfo && !clientInfo->writeLimitingActive) + clientInfo = NULL; // we only care about quota limits here + + if (clientInfo) { + assert(clientInfo->selectWaiting); + clientInfo->selectWaiting = false; + + assert(clientInfo->hasQueue()); + assert(clientInfo->quotaPeekFd() == fd); + clientInfo->quotaDequeue(); // we will write or requeue below + + if (nleft > 0) { + const int quota = clientInfo->quotaForDequed(); + if (!quota) { // if no write quota left, queue this fd + state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); + clientInfo->kickQuotaQueue(); + PROF_stop(commHandleWrite); + return; + } + + const int nleft_corrected = min(nleft, quota); + if (nleft != nleft_corrected) { - debugs(5, 5, HERE << "FD " << fd << " writes only " << ++ debugs(5, 5, HERE << state->conn << " writes only " << + nleft_corrected << " out of " << nleft); + nleft = nleft_corrected; + } + + } + } + #endif + + /* actually WRITE data */ + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + debugs(5, 5, HERE << "write() returns " << len); + + #if DELAY_POOLS + if (clientInfo) { + if (len > 0) { + /* we wrote data - drain them from bucket */ + clientInfo->bucketSize -= len; + if (clientInfo->bucketSize < 0.0) { + debugs(5,1, HERE << "drained too much"); // should not happen + clientInfo->bucketSize = 0; + } + } + + // even if we wrote nothing, we were served; give others a chance + clientInfo->kickQuotaQueue(); + } + #endif + + fd_bytes(fd, len, FD_WRITE); + statCounter.syscalls.sock.writes++; + // After each successful partial write, + // reset fde::writeStart to the current time. + fd_table[fd].writeStart = squid_curtime; + + if (len == 0) { + /* Note we even call write if nleft == 0 */ + /* We're done */ + if (nleft != 0) + debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining."); + + state->finish(nleft ? COMM_ERROR : COMM_OK, errno); + } else if (len < 0) { + /* An error */ + if (fd_table[fd].flags.socket_eof) { + debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); + state->finish(nleft ? COMM_ERROR : COMM_OK, errno); + } else if (ignoreErrno(errno)) { + debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); + state->selectOrQueueWrite(); + } else { + debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); + state->finish(nleft ? COMM_ERROR : COMM_OK, errno); + } + } else { + /* A successful write, continue */ + state->offset += len; + + if (state->offset < state->size) { + /* Not done, reinstall the write handler and write some more */ + state->selectOrQueueWrite(); + } else { + state->finish(nleft ? COMM_OK : COMM_ERROR, errno); + } + } + + PROF_stop(commHandleWrite); + } diff --cc src/comm/Write.h index 0000000000,1d5dcdff92..8b5ecc7fc5 mode 000000,100644..100644 --- a/src/comm/Write.h +++ b/src/comm/Write.h @@@ -1,0 -1,30 +1,31 @@@ + #ifndef _SQUID_COMM_IOWRITE_H + #define _SQUID_COMM_IOWRITE_H + + #include "base/AsyncCall.h" ++#include "comm/forward.h" + + namespace Comm { + + /** + * Queue a write. callback is scheduled when the write + * completes, on error, or on file descriptor close. + * + * free_func is used to free the passed buffer when the write has completed. + */ -void Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func); ++void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func); + + /** + * Queue a write. callback is scheduled when the write + * completes, on error, or on file descriptor close. + */ -void Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback); ++void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback); + + /// Cancel the write pending on FD. No action if none pending. -void WriteCancel(int fd, const char *reason); ++void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason); + + // callback handler to process an FD which is available for writing. + extern PF HandleWrite; + + } // namespace Comm + + #endif /* _SQUID_COMM_IOWRITE_H */ diff --cc src/dns_internal.cc index fcb346d766,c6fe4a04f4..903ef656aa --- a/src/dns_internal.cc +++ b/src/dns_internal.cc @@@ -32,12 -33,13 +32,13 @@@ * */ -#include "config.h" #include "squid.h" -#include "event.h" -#include "SquidTime.h" -#include "Store.h" -#include "comm.h" +#include "base/InstanceId.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" + #include "comm/Write.h" +#include "comm.h" +#include "event.h" #include "fde.h" #include "ip/tools.h" #include "MemBuf.h" @@@ -770,9 -766,12 +771,11 @@@ idnsDoSendQueryVC(nsvc *vc vc->busy = 1; - commSetTimeout(vc->fd, Config.Timeout.idns_query, NULL, NULL); + commSetTimeout(vc->conn->fd, Config.Timeout.idns_query, NULL, NULL); - comm_write_mbuf(vc->conn, mb, idnsSentQueryVC, vc); + AsyncCall::Pointer call = commCbCall(78, 5, "idnsSentQueryVC", + CommIoCbPtrFun(&idnsSentQueryVC, vc)); - - Comm::Write(vc->fd, mb, call); ++ Comm::Write(vc->conn, mb, call); delete mb; } diff --cc src/errorpage.cc index 80d0ae344d,65ccb49495..843699a7c4 --- a/src/errorpage.cc +++ b/src/errorpage.cc @@@ -32,12 -32,11 +32,12 @@@ * */ #include "config.h" - +#include "auth/UserRequest.h" +#include "comm/Connection.h" + #include "comm/Write.h" +#include "err_detail_type.h" #include "errorpage.h" -#include "auth/UserRequest.h" -#include "SquidTime.h" -#include "Store.h" +#include "fde.h" #include "html_quote.h" #include "HttpReply.h" #include "HttpRequest.h" @@@ -460,9 -459,13 +460,11 @@@ errorSend(const Comm::ConnectionPointe err->flags.flag_cbdata = 1; rep = err->BuildHttpReply(); - MemBuf *mb = rep->pack(); - comm_write_mbuf(conn, mb, errorSendComplete, err); + AsyncCall::Pointer call = commCbCall(78, 5, "errorSendComplete", + CommIoCbPtrFun(&errorSendComplete, err)); - Comm::Write(fd, mb, call); ++ Comm::Write(conn, mb, call); delete mb; - delete rep; } diff --cc src/ftp.cc index 69084870f1,1598a9861e..873da7bb41 --- a/src/ftp.cc +++ b/src/ftp.cc @@@ -34,9 -34,10 +34,10 @@@ #include "squid.h" #include "comm.h" +#include "comm/ConnOpener.h" +#include "comm/ConnAcceptor.h" + #include "comm/Write.h" -#include "comm/ListenStateData.h" #include "compat/strtoll.h" -#include "ConnectionDetail.h" #include "errorpage.h" #include "fde.h" #include "forward.h" @@@ -1555,8 -1530,9 +1556,8 @@@ FtpStateData::writeCommand(const char * } typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(9, 5, - Dialer, this, FtpStateData::ftpWriteCommandCallback); - Comm::Write(ctrl.fd, ctrl.last_command, strlen(ctrl.last_command), call, NULL); + AsyncCall::Pointer call = JobCallback(9, 5, Dialer, this, FtpStateData::ftpWriteCommandCallback); - comm_write(ctrl.conn, ctrl.last_command, strlen(ctrl.last_command), call); ++ Comm::Write(ctrl.conn, ctrl.last_command, strlen(ctrl.last_command), call, NULL); scheduleReadControlReply(0); } diff --cc src/gopher.cc index a699160130,5023d83e9e..9082f5c78e --- a/src/gopher.cc +++ b/src/gopher.cc @@@ -977,8 -985,10 +978,10 @@@ gopherSendRequest(int fd, void *data snprintf(buf, 4096, "%s\r\n", gopherState->request); } - debugs(10, 5, "gopherSendRequest: FD " << fd); + debugs(10, 5, HERE << gopherState->serverConn); - comm_write(gopherState->serverConn, buf, strlen(buf), gopherSendComplete, gopherState, NULL); + AsyncCall::Pointer call = commCbCall(5,5, "gopherSendComplete", + CommIoCbPtrFun(gopherSendComplete, gopherState)); - Comm::Write(fd, buf, strlen(buf), call, NULL); ++ Comm::Write(gopherState->serverConn, buf, strlen(buf), call, NULL); if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE)) gopherState->entry->setPublicKey(); /* Make it public */ diff --cc src/helper.cc index 10251bb4f4,e0a6998460..30a5ea0d7b --- a/src/helper.cc +++ b/src/helper.cc @@@ -33,10 -33,8 +33,11 @@@ */ #include "squid.h" +#include "comm.h" +#include "comm/Connection.h" + #include "comm/Write.h" #include "helper.h" +#include "MemBuf.h" #include "SquidMath.h" #include "SquidTime.h" #include "Store.h" @@@ -792,9 -793,58 +793,54 @@@ helperStatefulServerFree(int fd, void * cbdataFree(srv); } + /// Calls back with a pointer to the buffer with the helper output + static void helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end) + { + helper_request *r = srv->requests[request_number]; + if (r) { + HLPCB *callback = r->callback; + + srv->requests[request_number] = NULL; + + r->callback = NULL; + + void *cbdata = NULL; + if (cbdataReferenceValidDone(r->data, &cbdata)) + callback(cbdata, msg); + + srv->stats.pending--; + + hlp->stats.replies++; + + srv->answer_time = current_time; + + srv->dispatch_time = r->dispatch_time; + + hlp->stats.avg_svc_time = + Math::intAverage(hlp->stats.avg_svc_time, + tvSubMsec(r->dispatch_time, current_time), + hlp->stats.replies, REDIRECT_AV_FACTOR); + + helperRequestFree(r); + } else { + debugs(84, 1, "helperHandleRead: unexpected reply on channel " << + request_number << " from " << hlp->id_name << " #" << srv->index + 1 << + " '" << srv->rbuf << "'"); + } + srv->roffset -= (msg_end - srv->rbuf); + memmove(srv->rbuf, msg_end, srv->roffset + 1); + + if (!srv->flags.shutdown) { + helperKickQueue(hlp); + } else if (!srv->flags.closing && !srv->stats.pending) { - int wfd = srv->wfd; - srv->wfd = -1; - if (srv->rfd == wfd) - srv->rfd = -1; + srv->flags.closing=1; - comm_close(wfd); ++ srv->writePipe->close(); + return; + } + } static void -helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) +helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { char *t = NULL; helper_server *srv = (helper_server *)data; @@@ -1172,11 -1189,9 +1183,9 @@@ helperDispatchWriteDone(const Comm::Con srv->writebuf = srv->wqueue; srv->wqueue = new MemBuf; srv->flags.writing = 1; - comm_write(srv->writePipe, - srv->writebuf->content(), - srv->writebuf->contentSize(), - helperDispatchWriteDone, /* Handler */ - srv, NULL); /* Handler-data, freefunc */ + AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone", + CommIoCbPtrFun(helperDispatchWriteDone, srv)); - Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL); ++ Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL); } } @@@ -1218,11 -1233,9 +1227,9 @@@ helperDispatch(helper_server * srv, hel srv->writebuf = srv->wqueue; srv->wqueue = new MemBuf; srv->flags.writing = 1; - comm_write(srv->writePipe, - srv->writebuf->content(), - srv->writebuf->contentSize(), - helperDispatchWriteDone, /* Handler */ - srv, NULL); /* Handler-data, free func */ + AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone", + CommIoCbPtrFun(helperDispatchWriteDone, srv)); - Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL); ++ Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL); } debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes"); @@@ -1273,11 -1286,9 +1280,9 @@@ helperStatefulDispatch(helper_stateful_ srv->flags.reserved = 1; srv->request = r; srv->dispatch_time = current_time; - comm_write(srv->writePipe, - r->buf, - strlen(r->buf), - helperStatefulDispatchWriteDone, /* Handler */ - hlp, NULL); /* Handler-data, free func */ + AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone", + CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp)); - Comm::Write(srv->wfd, r->buf, strlen(r->buf), call, NULL); ++ Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL); debugs(84, 5, "helperStatefulDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << (int) strlen(r->buf) << " bytes"); diff --cc src/http.cc index ca099f31df,2595c85160..d0badb1175 --- a/src/http.cc +++ b/src/http.cc @@@ -45,7 -45,7 +45,8 @@@ #include "base/AsyncJobCalls.h" #include "base/TextException.h" #include "base64.h" +#include "comm/Connection.h" + #include "comm/Write.h" #if DELAY_POOLS #include "DelayPools.h" #endif @@@ -2137,8 -2138,9 +2137,9 @@@ HttpStateData::sendRequest( mb.init(); request->peer_host=_peer?_peer->host:NULL; buildRequestPrefix(request, orig_request, entry, &mb); - debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf); - Comm::Write(fd, &mb, requestSender); + debugs(11, 6, HERE << serverConnection << ":\n" << mb.buf); - comm_write_mbuf(serverConnection, &mb, requestSender); ++ Comm::Write(serverConnection, &mb, requestSender); + return true; } @@@ -2224,7 -2226,7 +2225,7 @@@ HttpStateData::finishingBrokenPost( typedef CommCbMemFunT Dialer; requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast); - comm_write(serverConnection, "\r\n", 2, requestSender); - Comm::Write(fd, "\r\n", 2, requestSender, NULL); ++ Comm::Write(serverConnection, "\r\n", 2, requestSender, NULL); return true; #else return false; @@@ -2244,8 -2246,9 +2245,8 @@@ HttpStateData::finishingChunkedRequest( flags.sentLastChunk = true; typedef CommCbMemFunT Dialer; - requestSender = JobCallback(11,5, - Dialer, this, HttpStateData::wroteLast); - Comm::Write(fd, "0\r\n\r\n", 5, requestSender, NULL); + requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast); - comm_write(serverConnection, "0\r\n\r\n", 5, requestSender); ++ Comm::Write(serverConnection, "0\r\n\r\n", 5, requestSender, NULL); return true; } diff --cc src/ident/Ident.cc index 3a68ada2cb,a6c34699a5..1a099e490c --- a/src/ident/Ident.cc +++ b/src/ident/Ident.cc @@@ -37,9 -37,7 +37,10 @@@ #if USE_IDENT #include "comm.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" +#include "CommCalls.h" + #include "comm/Write.h" #include "ident/Config.h" #include "ident/Ident.h" #include "MemBuf.h" @@@ -151,11 -148,13 +152,12 @@@ Ident::ConnectDone(const Comm::Connecti MemBuf mb; mb.init(); mb.Printf("%d, %d\r\n", - state->my_peer.GetPort(), - state->me.GetPort()); - + conn->remote.GetPort(), + conn->local.GetPort()); - comm_write_mbuf(conn, &mb, NULL, state); + AsyncCall::Pointer nil; - Comm::Write(fd, &mb, nil); - comm_read(fd, state->buf, BUFSIZ, Ident::ReadReply, state); - commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state); ++ Comm::Write(conn, &mb, nil); + comm_read(conn, state->buf, BUFSIZ, Ident::ReadReply, state); + commSetTimeout(conn->fd, Ident::TheConfig.timeout, Ident::Timeout, state); } void diff --cc src/ip/QosConfig.h index 08d8c56097,857fafed44..b9f2765afc --- a/src/ip/QosConfig.h +++ b/src/ip/QosConfig.h @@@ -1,9 -1,7 +1,8 @@@ #ifndef SQUID_QOSCONFIG_H #define SQUID_QOSCONFIG_H - #include "config.h" #include "hier_code.h" +#include "ip/forward.h" #if HAVE_LIBNETFILTER_CONNTRACK_LIBNETFILTER_CONNTRACK_H #include diff --cc src/ipc/StartListening.h index 3f629dae54,99354133b4..91d97e2478 --- a/src/ipc/StartListening.h +++ b/src/ipc/StartListening.h @@@ -8,12 -8,9 +8,11 @@@ #ifndef SQUID_IPC_START_LISTENING_H #define SQUID_IPC_START_LISTENING_H - #include "config.h" +#include "base/AsyncCall.h" +#include "base/Subscription.h" +#include "comm/forward.h" #include "ip/forward.h" #include "ipc/FdNotes.h" -#include "base/AsyncCall.h" #if HAVE_IOSFWD #include diff --cc src/ipc/UdsOp.cc index f5bc3307f4,873450574a..36303d799e --- a/src/ipc/UdsOp.cc +++ b/src/ipc/UdsOp.cc @@@ -4,11 -4,13 +4,12 @@@ * DEBUG: section 54 Interprocess Communication * */ - - #include "config.h" +#include "base/TextException.h" #include "comm.h" #include "CommCalls.h" +#include "comm/Connection.h" + #include "comm/Write.h" -#include "base/TextException.h" #include "ipc/UdsOp.h" @@@ -108,7 -107,7 +109,7 @@@ void Ipc::UdsSender::write( typedef CommCbMemFunT Dialer; AsyncCall::Pointer writeHandler = JobCallback(54, 5, Dialer, this, UdsSender::wrote); - comm_write(conn(), message.raw(), message.size(), writeHandler); - Comm::Write(fd(), message.raw(), message.size(), writeHandler, NULL); ++ Comm::Write(conn(), message.raw(), message.size(), writeHandler, NULL); writing = true; } diff --cc src/main.cc index ed49e83c8b,7871413714..854c763365 --- a/src/main.cc +++ b/src/main.cc @@@ -90,15 -86,31 +90,38 @@@ #if USE_LOADABLE_MODULES #include "LoadableModules.h" #endif +#include "Mem.h" +#include "MemPool.h" +#include "pconn.h" +#include "PeerSelectState.h" +#include "SquidTime.h" +#include "Store.h" +#include "StoreFileSystem.h" +#include "SwapDir.h" + #if USE_SSL_CRTD + #include "ssl/helper.h" + #include "ssl/certificate_db.h" + #endif + + #if USE_SSL + #include "ssl/context_storage.h" + #endif + + #if ICAP_CLIENT + #include "adaptation/icap/Config.h" + #endif + #if USE_ECAP + #include "adaptation/ecap/Config.h" + #endif + #if USE_ADAPTATION + #include "adaptation/Config.h" + #endif + #if USE_SQUID_ESI + #include "esi/Module.h" + #endif + #include "fs/Module.h" + #if HAVE_PATHS_H #include #endif diff --cc src/mgr/Forwarder.cc index 480122a56b,2d908978cf..fdec19287a --- a/src/mgr/Forwarder.cc +++ b/src/mgr/Forwarder.cc @@@ -123,10 -123,11 +123,10 @@@ Mgr::Forwarder::doneAll() cons /// called when the client socket gets closed by some external force void - Mgr::Forwarder::noteCommClosed(const CommCloseCbParams& params) + Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &io) { debugs(16, 5, HERE); - Must(fd == io.fd); - fd = -1; + Must(!Comm::IsConnOpen(clientConnection)); mustStop("commClosed"); } diff --cc src/mgr/Inquirer.cc index 5da18137ed,d249c3a913..e6e873b0d1 --- a/src/mgr/Inquirer.cc +++ b/src/mgr/Inquirer.cc @@@ -7,9 -7,8 +7,10 @@@ #include "config.h" #include "base/TextException.h" +#include "comm.h" + #include "comm/Write.h" #include "CommCalls.h" +#include "comm/Connection.h" #include "HttpReply.h" #include "ipc/Coordinator.h" #include "mgr/ActionWriter.h" @@@ -91,7 -91,7 +92,7 @@@ Mgr::Inquirer::start( std::auto_ptr replyBuf(reply->pack()); writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader", CommCbMemFunT(this, &Inquirer::noteWroteHeader)); - comm_write_mbuf(clientConnection, replyBuf.get(), writer); - Comm::Write(fd, replyBuf.get(), writer); ++ Comm::Write(clientConnection, replyBuf.get(), writer); } /// called when we wrote the response header diff --cc src/mgr/StoreToCommWriter.cc index b18f54eadc,1227333768..12dfce29ef --- a/src/mgr/StoreToCommWriter.cc +++ b/src/mgr/StoreToCommWriter.cc @@@ -7,8 -7,8 +7,9 @@@ #include "config.h" #include "base/TextException.h" +#include "comm/Connection.h" #include "CommCalls.h" + #include "comm/Write.h" #include "ipc/FdNotes.h" #include "mgr/StoreToCommWriter.h" #include "StoreClient.h" @@@ -108,7 -109,7 +109,7 @@@ Mgr::StoreToCommWriter::scheduleCommWri AsyncCall::Pointer writer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote", MyDialer(this, &StoreToCommWriter::noteCommWrote)); - comm_write(clientConnection, ioBuf.data, ioBuf.length, writer); - Comm::Write(fd, ioBuf.data, ioBuf.length, writer, NULL); ++ Comm::Write(clientConnection, ioBuf.data, ioBuf.length, writer, NULL); } void diff --cc src/tunnel.cc index 64605a6178,a501859f73..3784a5b017 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@@ -33,13 -34,13 +33,14 @@@ */ #include "squid.h" -#include "errorpage.h" -#include "HttpRequest.h" -#include "fde.h" +#include "acl/FilledChecklist.h" +#include "Array.h" #include "comm.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" + #include "comm/Write.h" +#include "client_side.h" #include "client_side_request.h" -#include "acl/FilledChecklist.h" #if DELAY_POOLS #include "DelayId.h" #endif @@@ -118,7 -116,9 +119,7 @@@ private void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); }; - static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n"; -#define fd_closed(fd) (fd == -1 || fd_table[fd].closing()) - + static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; static CNCB tunnelConnectDone; static ERCB tunnelErrorComplete; @@@ -309,15 -315,18 +310,18 @@@ TunnelStateData::copy(size_t len, comm_ if (len < 0 || errcode) from.error (xerrno); - else if (len == 0 || fd_closed(to.fd())) { - comm_close(from.fd()); - /* Only close the remote end if we've finished queueing data to it */ + else if (len == 0 || !Comm::IsConnOpen(to.conn)) { + from.conn->close(); - if (from.len == 0 && !fd_closed(to.fd()) ) { - comm_close(to.fd()); + /* Only close the remote end if we've finished queueing data to it */ + if (from.len == 0 && Comm::IsConnOpen(to.conn) ) { + to.conn->close(); } - } else if (cbdataReferenceValid(this)) - comm_write(to.conn, from.buf, len, completion, this, NULL); + } else if (cbdataReferenceValid(this)) { + AsyncCall::Pointer call = commCbCall(5,5, "SomeTunnelWriteHandler", + CommIoCbPtrFun(completion, this)); - Comm::Write(to.fd(), from.buf, len, call, NULL); ++ Comm::Write(to.conn, from.buf, len, call, NULL); + } cbdataInternalUnlock(this); /* ??? */ } @@@ -454,11 -465,47 +458,15 @@@ voi TunnelStateData::copyRead(Connection &from, IOCB *completion) { assert(from.len == 0); - comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this); -} - -static void -tunnelConnectTimeout(int fd, void *data) -{ - TunnelStateData *tunnelState = (TunnelStateData *)data; - HttpRequest *request = tunnelState->request; - ErrorState *err = NULL; - - if (tunnelState->servers) { - if (tunnelState->servers->_peer) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->servers->_peer->host); - else if (Config.onoff.log_ip_on_direct) - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - fd_table[tunnelState->server.fd()].ipaddr); - else - hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code, - tunnelState->host); - } else - debugs(26, 1, "tunnelConnectTimeout(): tunnelState->servers is NULL"); - - err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request); - - *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE; - - err->xerrno = ETIMEDOUT; - - err->port = tunnelState->port; - - err->callback = tunnelErrorComplete; - - err->callback_data = tunnelState; - - errorSend(tunnelState->client.fd(), err); - comm_close(fd); + comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this); } ++/** ++ * All the pieces we need to write to client and/or server connection ++ * Have been written. Start the blind pump. ++ */ static void -tunnelConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) +tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; @@@ -473,14 -520,24 +481,15 @@@ } } -/* - * handle the write completion from a proxy request to an upstream proxy - */ static void -tunnelProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) -{ - tunnelConnectedWriteDone(fd, buf, size, flag, xerrno, data); -} - -static void -tunnelConnected(int fd, void *data) +tunnelConnected(const Comm::ConnectionPointer &server, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; - debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState); + debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState); *tunnelState->status_ptr = HTTP_OK; - comm_write(tunnelState->client.conn, conn_established, strlen(conn_established), - tunnelConnectedWriteDone, tunnelState, NULL); + AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone", + CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState)); - Comm::Write(tunnelState->client.fd(), conn_established, strlen(conn_established), call, NULL); ++ Comm::Write(tunnelState->client.conn, conn_established, strlen(conn_established), call, NULL); } static void @@@ -635,13 -724,13 +644,13 @@@ tunnelStart(ClientHttpRequest * http, i } static void - tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *data) -tunnelProxyConnected(int fd, void *data) ++tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; HttpHeader hdr_out(hoRequest); Packer p; http_state_flags flags; - debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState); - debugs(26, 3, "tunnelProxyConnected: FD " << fd << " tunnelState=" << tunnelState); ++ debugs(26, 3, HERE << srv << ", tunnelState=" << tunnelState); memset(&flags, '\0', sizeof(flags)); flags.proxying = tunnelState->request->flags.proxying; MemBuf mb; @@@ -658,8 -747,11 +667,10 @@@ packerClean(&p); mb.append("\r\n", 2); - comm_write_mbuf(server, &mb, tunnelConnectedWriteDone, tunnelState); - commSetTimeout(server->fd, Config.Timeout.read, tunnelTimeout, tunnelState); - AsyncCall::Pointer call = commCbCall(5,5, "tunnelProxyConnectedWriteDone", - CommIoCbPtrFun(tunnelProxyConnectedWriteDone, tunnelState)); - - Comm::Write(tunnelState->server.fd(), &mb, call); - commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState); ++ AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone", ++ CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState)); ++ Comm::Write(srv, &mb, call); ++ commSetTimeout(srv->fd, Config.Timeout.read, tunnelTimeout, tunnelState); } static void diff --cc src/whois.cc index 1d5284dc72,fc300d3191..a7e182b74a --- a/src/whois.cc +++ b/src/whois.cc @@@ -100,9 -102,12 +101,12 @@@ whoisStart(FwdState * fwd String str_print=p->request->urlpath.substr(1,p->request->urlpath.size()); snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print)); - comm_write(fwd->serverConnection(), buf, strlen(buf), whoisWriteComplete, p, NULL); + AsyncCall::Pointer call = commCbCall(5,5, "whoisWriteComplete", + CommIoCbPtrFun(whoisWriteComplete, p)); + - Comm::Write(fd, buf, strlen(buf), call, NULL); - comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p); - commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p); ++ Comm::Write(fwd->serverConnection(), buf, strlen(buf), call, NULL); + comm_read(fwd->serverConnection(), p->buf, BUFSIZ, whoisReadReply, p); + commSetTimeout(fwd->serverConnection()->fd, Config.Timeout.read, whoisTimeout, p); } /* PRIVATE */