From: Amos Jeffries Date: Mon, 31 Jan 2011 11:50:28 +0000 (+1300) Subject: Merge from trunk X-Git-Tag: take08~55^2~124^2~20 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8bbb16e322936ac8ef477a2a355f8d189a907da8;p=thirdparty%2Fsquid.git Merge from trunk --- 8bbb16e322936ac8ef477a2a355f8d189a907da8 diff --cc doc/release-notes/release-3.2.sgml index a2f8c292bc,6fa64cd302..d1baf93af7 --- a/doc/release-notes/release-3.2.sgml +++ b/doc/release-notes/release-3.2.sgml @@@ -548,10 -561,9 +565,13 @@@ This section gives a thorough account o log_fqdn

Obsolete. Replaced by automatic detection of the %>A logformat tag. + maximum_single_addr_tries +

The behaviour controlled by this directive is no longer possible. + It has been replaced by connect_retries option which operates a little differently. + + referer_log +

Replaced by the referrer format option on an access_log directive. + url_rewrite_concurrency

Replaced by url_rewrite_children ... concurrency=N option. diff --cc src/ProtoPort.cc index 91064d155e,de42e8e204..5ac803df6c --- a/src/ProtoPort.cc +++ b/src/ProtoPort.cc @@@ -1,13 -1,19 +1,14 @@@ -/* - * $Id$ - */ - #include "squid.h" + #include "comm.h" #include "ProtoPort.h" #if HAVE_LIMITS #include #endif - http_port_list::http_port_list(const char *aProtocol) + http_port_list::http_port_list(const char *aProtocol) : - listenFd(-1) #if USE_SSL - : - http(*this), dynamicCertMemCacheSize(std::numeric_limits::max()) - , http(*this) - , dynamicCertMemCacheSize(std::numeric_limits::max()) ++ http(*this), ++ dynamicCertMemCacheSize(std::numeric_limits::max()) #endif { protocol = xstrdup(aProtocol); @@@ -15,8 -21,10 +16,10 @@@ http_port_list::~http_port_list() { - if (Comm::IsConnOpen(listenConn)) - if (listenFd >= 0) { - comm_close(listenFd); - listenFd = -1; ++ if (Comm::IsConnOpen(listenConn)) { + listenConn->close(); ++ listenConn = NULL; + } safe_free(name); safe_free(defaultsite); diff --cc src/ProtoPort.h index 98235048b3,323a6d2084..c21b95b5f6 --- a/src/ProtoPort.h +++ b/src/ProtoPort.h @@@ -38,7 -40,12 +38,12 @@@ struct http_port_list unsigned int timeout; } tcp_keepalive; - Comm::ConnectionPointer listenConn; ///< Socket used by a ConnAcceptor for incoming clients. + /** - * The FD listening socket. - * If >= 0 we are actively listening for client requests. - * use comm_close(listenFd) to stop. ++ * The listening socket details. ++ * If Comm::ConnIsOpen() we are actively listening for client requests. ++ * use listenConn->close() to stop. + */ - int listenFd; ++ Comm::ConnectionPointer listenConn; #if USE_SSL // XXX: temporary hack to ease move of SSL options to http_port diff --cc src/client_side.cc index 11c788fd89,f2d90a2609..4af6a95818 --- a/src/client_side.cc +++ b/src/client_side.cc @@@ -94,9 -96,11 +97,11 @@@ #include "ClientRequestContext.h" #include "clientStream.h" #include "comm.h" +#include "comm/Connection.h" - #include "comm/ConnAcceptor.h" + #include "CommCalls.h" + #include "comm/Loops.h" #include "comm/Write.h" + #include "comm/TcpAcceptor.h" -#include "ConnectionDetail.h" #include "eui/Config.h" #include "fde.h" #include "HttpHdrContRange.h" @@@ -140,28 -140,29 +141,28 @@@ class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb { public: - typedef void (*Handler)(int errNo, http_port_list *portCfg, bool uses_ssl); - ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag): - handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {} - typedef void (*Handler)(int fd, int flags, int errNo, http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub); - ListeningStartedDialer(Handler aHandler, int openFlags, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub): - handler(aHandler), portCfg(aPortCfg), portTypeNote(note), commOpenListenerFlags(openFlags), sub(aSub) {} ++ typedef void (*Handler)(http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub); ++ ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub): ++ handler(aHandler), portCfg(aPortCfg), portTypeNote(note), sub(aSub) {} virtual void print(std::ostream &os) const { startPrint(os) << - ", " << (uses_ssl? "SSL " :"") << "port=" << (void*)portCfg << ')'; - ", port=" << (void*)portCfg << ')'; ++ ", " << FdNote(portTypeNote) << " port=" << (void*)portCfg << ')'; } virtual bool canDial(AsyncCall &) const { return true; } - virtual void dial(AsyncCall &) { (handler)(errNo, portCfg, uses_ssl); } - virtual void dial(AsyncCall &) { (handler)(fd, commOpenListenerFlags, errNo, portCfg, portTypeNote, sub); } ++ virtual void dial(AsyncCall &) { (handler)(portCfg, portTypeNote, sub); } public: Handler handler; private: - http_port_list *portCfg; ///< from Config.Sockaddr.http - bool uses_ssl; + http_port_list *portCfg; ///< from Config.Sockaddr.http + Ipc::FdNoteId portTypeNote; ///< Type of IPC socket being opened - int commOpenListenerFlags; ///< flags used by comm_open_listener + Subscription::Pointer sub; ///< The handler to be subscribed for this connetion listener }; - - static void clientListenerConnectionOpened(int errNo, http_port_list *s, bool uses_ssl); -static void clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub); ++static void clientListenerConnectionOpened(http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub); /* our socket-related context */ @@@ -254,13 -254,15 +255,14 @@@ ConnStateData::readSomeData( if (reading()) return; - debugs(33, 4, "clientReadSomeData: FD " << fd << ": reading request..."); + debugs(33, 4, "clientReadSomeData: FD " << clientConn->fd << ": reading request..."); - makeSpaceAvailable(); + if (!maybeMakeSpaceAvailable()) + return; typedef CommCbMemFunT Dialer; - reader = JobCallback(33, 5, - Dialer, this, ConnStateData::clientReadRequest); - comm_read(fd, in.addressToReadInto(), getAvailableBufferLength(), reader); + reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); + comm_read(clientConn, in.addressToReadInto(), getAvailableBufferLength(), reader); } @@@ -3109,23 -3116,28 +3116,27 @@@ connStateCreate(const Comm::ConnectionP /** Handle a new connection on HTTP socket. */ void - httpAccept(int sock, const Comm::ConnectionPointer &details, comm_err_t flag, int xerrno, void *data) -httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) ++httpAccept(int, const Comm::ConnectionPointer &details, comm_err_t flag, int xerrno, void *data) { http_port_list *s = (http_port_list *)data; ConnStateData *connState = NULL; - assert(flag == COMM_OK); // acceptor does not call us for anything bad. + if (flag != COMM_OK) { + // Its possible the call was still queued when the client disconnected - debugs(33, 2, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); ++ debugs(33, 2, "httpAccept: " << s->listenConn << ": accept failure: " << xstrerr(xerrno)); + return; + } - debugs(33, 4, "httpAccept: FD " << newfd << ": accepted"); - fd_note(newfd, "client http connect"); - connState = connStateCreate(&details->peer, &details->me, newfd, s); + debugs(33, 4, HERE << details << ": accepted"); + fd_note(details->fd, "client http connect"); + connState = connStateCreate(details, s); typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(33, 5, - Dialer, connState, ConnStateData::connStateClosed); - comm_add_close_handler(newfd, call); + AsyncCall::Pointer call = JobCallback(33, 5, Dialer, connState, ConnStateData::connStateClosed); + comm_add_close_handler(details->fd, call); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(details->remote, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, @@@ -3350,26 -3362,32 +3361,30 @@@ clientNegotiateSSL(int fd, void *data /** handle a new HTTPS connection */ static void - httpsAccept(int sock, const Comm::ConnectionPointer& details, comm_err_t flag, int xerrno, void *data) -httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data) ++httpsAccept(int, const Comm::ConnectionPointer& details, comm_err_t flag, int xerrno, void *data) { https_port_list *s = (https_port_list *)data; SSL_CTX *sslContext = s->staticSslContext.get(); - assert(flag != COMM_OK); // Acceptor does not call un unless successful. + if (flag != COMM_OK) { + // Its possible the call was still queued when the client disconnected - debugs(33, 2, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno)); ++ debugs(33, 2, "httpsAccept: " << s->listenConn << ": accept failure: " << xstrerr(xerrno)); + return; + } SSL *ssl = NULL; - if (!(ssl = httpsCreate(newfd, details, sslContext))) + if (!(ssl = httpsCreate(details, sslContext))) return; - debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation."); - fd_note(newfd, "client https connect"); - ConnStateData *connState = connStateCreate(details->peer, details->me, - newfd, &s->http); + debugs(33, 5, HERE << details << " accepted, starting SSL negotiation."); + fd_note(details->fd, "client https connect"); + ConnStateData *connState = connStateCreate(details, &s->http); typedef CommCbMemFunT Dialer; - AsyncCall::Pointer call = JobCallback(33, 5, - Dialer, connState, ConnStateData::connStateClosed); - comm_add_close_handler(newfd, call); + AsyncCall::Pointer call = JobCallback(33, 5, Dialer, connState, ConnStateData::connStateClosed); + comm_add_close_handler(details->fd, call); if (Config.onoff.log_fqdn) - fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS); + fqdncache_gethostbyaddr(details->remote, FQDN_LOOKUP_IF_MISS); typedef CommCbMemFunT TimeoutDialer; AsyncCall::Pointer timeoutCall = JobCallback(33, 5, @@@ -3387,12 -3405,12 +3402,12 @@@ #endif if (s->http.tcp_keepalive.enabled) { - commSetTcpKeepalive(newfd, s->http.tcp_keepalive.idle, s->http.tcp_keepalive.interval, s->http.tcp_keepalive.timeout); + commSetTcpKeepalive(details->fd, s->http.tcp_keepalive.idle, s->http.tcp_keepalive.interval, s->http.tcp_keepalive.timeout); } - commSetSelect(details->fd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); - Comm::SetSelect(newfd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); ++ Comm::SetSelect(details->fd, COMM_SELECT_READ, clientNegotiateSSL, connState, 0); - clientdbEstablished(details->peer, 1); + clientdbEstablished(details->remote, 1); incoming_sockets_accepted++; } @@@ -3495,17 -3513,20 +3510,15 @@@ ConnStateData::getSslContextDone(SSL_CT } } - // fake a ConnectionDetail object; XXX: make ConnState a ConnectionDetail? - ConnectionDetail detail; - detail.me = me; - detail.peer = peer; - SSL *ssl = NULL; - if (!(ssl = httpsCreate(fd, &detail, sslContext))) + if (!(ssl = httpsCreate(clientConn, sslContext))) return false; - // commSetTimeout() was called for this request before we switched. + // commSetConnTimeout() was called for this request before we switched. // Disable the client read handler until peer selection is complete - commSetSelect(clientConn->fd, COMM_SELECT_READ, NULL, NULL, 0); - - commSetSelect(clientConn->fd, COMM_SELECT_READ, clientNegotiateSSL, this, 0); - - Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); - Comm::SetSelect(fd, COMM_SELECT_READ, clientNegotiateSSL, this, 0); ++ Comm::SetSelect(clientConn->fd, COMM_SELECT_READ, NULL, NULL, 0); ++ Comm::SetSelect(clientConn->fd, COMM_SELECT_READ, clientNegotiateSSL, this, 0); switchedToHttps_ = true; return true; } @@@ -3531,9 -3552,9 +3544,9 @@@ ConnStateData::switchToHttps(const cha /// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed static bool - OpenedHttpSocket(const Comm::ConnectionPointer &clientConn, const char *msgIfFail) -OpenedHttpSocket(int fd, const Ipc::FdNoteId portType) ++OpenedHttpSocket(const Comm::ConnectionPointer &clientConn, const Ipc::FdNoteId portType) { - if (fd < 0) { + if (!Comm::IsConnOpen(clientConn)) { Must(NHttpSockets > 0); // we tried to open some --NHttpSockets; // there will be fewer sockets than planned Must(HttpSockets[NHttpSockets] < 0); // no extra fds received @@@ -3590,15 -3611,10 +3603,11 @@@ clientHttpConnectionsOpen(void Ssl::Helper::GetInstance(); #endif //USE_SSL_CRTD - // NOTE: would the design here be better if we opened both the ConnAcceptor and IPC informative messages now? - // that way we have at least one worker listening on the socket immediately with others joining in as - // they receive the IPC message. - /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */ -- - const int openFlags = COMM_NONBLOCKING | - (s->spoof_client_ip ? COMM_TRANSPARENT : 0); + // Fill out a Comm::Connection which IPC will open as a listener for us - // then pass back so we can start a ConnAcceptor subscription. ++ // then pass back when active so we can start a TcpAcceptor subscription. + s->listenConn = new Comm::Connection; + s->listenConn->local = s->s; + s->listenConn->flags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0); // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP typedef CommCbFunPtrCallT AcceptCall; @@@ -3606,10 -3622,10 +3615,10 @@@ Subscription::Pointer sub = new CallSubscription(subCall); AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened", - ListeningStartedDialer(&clientListenerConnectionOpened, s, false)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpSocket, listenCall, sub); - ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, s, Ipc::fdnHttpSocket, sub)); - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall); ++ ListeningStartedDialer(&clientListenerConnectionOpened, s, Ipc::fdnHttpSocket, sub)); ++ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpSocket, listenCall); - HttpSockets[NHttpSockets++] = -1; // set in clientListenerHttpConnectionOpened + HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened } #if USE_SSL @@@ -3652,35 -3689,16 +3661,38 @@@ clientHttpsConnectionsOpen(void Subscription::Pointer sub = new CallSubscription(subCall); AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened", - ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true)); - - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpsSocket, listenCall, sub); - - ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, ++ ListeningStartedDialer(&clientListenerConnectionOpened, + &s->http, Ipc::fdnHttpsSocket, sub)); - - Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall); - ++ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpsSocket, listenCall); HttpSockets[NHttpSockets++] = -1; } } #endif +/// process clientHttpConnectionsOpen result +static void - clientListenerConnectionOpened(int, http_port_list *s, bool uses_ssl) ++clientListenerConnectionOpened(http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub) +{ - if (!OpenedHttpSocket(s->listenConn, (uses_ssl?"Cannot open HTTP Port":"Cannot open HTTPS Port"))) ++ if (!OpenedHttpSocket(s->listenConn, portTypeNote)) + return; + + Must(s); + Must(Comm::IsConnOpen(s->listenConn)); + ++ // TCP: setup a job to handle accept() with subscribed handler ++ AsyncJob::Start(new Comm::TcpAcceptor(s->listenConn, FdNote(portTypeNote), sub)); ++ + debugs(1, 1, "Accepting" << + (s->intercepted ? " intercepted" : "") << + (s->spoof_client_ip ? " spoofing" : "") << + (s->sslBump ? " bumpy" : "") << + (s->accel ? " accelerated" : "") - << " HTTP" << (uses_ssl?"S":"") << " connections at " << s->listenConn << "."); ++ << FdNote(portTypeNote) << " connections at " ++ << s->listenConn); + + Must(AddOpenedHttpSocket(s->listenConn)); // otherwise, we have received a fd we did not ask for +} + void clientOpenListenSockets(void) { diff --cc src/client_side_reply.cc index 2ea75f9e69,d6ada937d2..1d6141a99e --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@@ -2055,13 -2060,13 +2056,12 @@@ clientReplyContext::sendMoreData (Store body_buf = buf; } -- if (reqofs==0 && !logTypeIsATcpHit(http->logType)) { - assert(conn != NULL && Comm::IsConnOpen(conn->clientConn)); // the beginning of this method implies FD may be closed. - assert(fd >= 0); // the beginning of this method implies fd may be -1 ++ if (reqofs==0 && !logTypeIsATcpHit(http->logType) && Comm::IsConnOpen(conn->clientConn)) { if (Ip::Qos::TheConfig.isHitTosActive()) { - Ip::Qos::doTosLocalMiss(fd, http->request->hier.code); + Ip::Qos::doTosLocalMiss(conn->clientConn, http->request->hier.code); } if (Ip::Qos::TheConfig.isHitNfmarkActive()) { - Ip::Qos::doNfmarkLocalMiss(fd, http->request->hier.code); + Ip::Qos::doNfmarkLocalMiss(conn->clientConn, http->request->hier.code); } } diff --cc src/comm.cc index b23e7b1c4a,f88caff2cb..cc5dc00474 --- a/src/comm.cc +++ b/src/comm.cc @@@ -40,11 -39,13 +40,13 @@@ #include "fde.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" +#include "comm/Connection.h" #include "comm/IoCallback.h" + #include "comm/Loops.h" #include "comm/Write.h" + #include "comm/TcpAcceptor.h" #include "CommIO.h" #include "CommRead.h" -#include "ConnectionDetail.h" #include "MemBuf.h" #include "pconn.h" #include "SquidTime.h" @@@ -151,12 -184,15 +153,12 @@@ commHandleRead(int fd, void *data } /* Nope, register for some more IO */ - commSetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0); + Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, data, 0); } -/** - * Queue a read. handler/handler_data are called when the read - * completes, on error, or on file descriptor close. - */ +#if 0 // obsolete wrapper. void -comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data) +comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, IOCB *handler, void *handler_data) { AsyncCall::Pointer call = commCbCall(5,4, "SomeCommReadHandler", CommIoCbPtrFun(handler, handler_data)); @@@ -190,7 -220,7 +192,7 @@@ comm_read(const Comm::ConnectionPointe /* Queue the read */ ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size); - commSetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); - Comm::SetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0); ++ Comm::SetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0); } /** diff --cc src/comm.h index d021258d52,fb206c93de..322d22ec93 --- a/src/comm.h +++ b/src/comm.h @@@ -54,34 -61,17 +54,24 @@@ extern void comm_open_listener(int sock SQUIDCEXTERN int comm_openex(int, int, Ip::Address &, int, tos_t tos, nfmark_t nfmark, const char *); SQUIDCEXTERN u_short comm_local_port(int fd); - SQUIDCEXTERN void commSetSelect(int, unsigned int, PF *, void *, time_t); - SQUIDCEXTERN void commResetSelect(int); - SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen); SQUIDCEXTERN void commCallCloseHandlers(int fd); -SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *); -extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback); +SQUIDCEXTERN int commSetTimeout(int fd, int, CTCB *, void *); +extern int commSetTimeout(int fd, int, AsyncCall::Pointer &callback); + +/** + * Set or clear the timeout for some action on an active connection. + * API to replace commSetTimeout() when a Comm::ConnectionPointer is available. + */ +extern int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback); +extern int commUnsetConnTimeout(const Comm::ConnectionPointer &conn); + SQUIDCEXTERN int ignoreErrno(int); SQUIDCEXTERN void commCloseAllSockets(void); SQUIDCEXTERN void checkTimeouts(void); - /* - * comm_select.c - */ - SQUIDCEXTERN void comm_select_init(void); - SQUIDCEXTERN comm_err_t comm_select(int); - SQUIDCEXTERN void comm_quick_poll_required(void); - -class ConnectionDetail; -typedef void IOACB(int fd, int nfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data); +//typedef void IOACB(int fd, int nfd, Comm::ConnectionPointer details, comm_err_t flag, int xerrno, void *data); extern void comm_add_close_handler(int fd, PF *, void *); extern void comm_add_close_handler(int fd, AsyncCall::Pointer &); extern void comm_remove_close_handler(int fd, PF *, void *); diff --cc src/comm/AcceptLimiter.cc index a3ce69935a,d0f5763e0f..23f7b8ff91 --- a/src/comm/AcceptLimiter.cc +++ b/src/comm/AcceptLimiter.cc @@@ -1,7 -1,6 +1,7 @@@ #include "config.h" #include "comm/AcceptLimiter.h" - #include "comm/ConnAcceptor.h" +#include "comm/Connection.h" + #include "comm/TcpAcceptor.h" #include "fde.h" Comm::AcceptLimiter Comm::AcceptLimiter::Instance_; @@@ -12,10 -11,10 +12,10 @@@ Comm::AcceptLimiter &Comm::AcceptLimite } void - Comm::AcceptLimiter::defer(Comm::ConnAcceptor *afd) + Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd) { afd->isLimited++; - debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited); + debugs(5, 5, HERE << afd->conn << " x" << afd->isLimited); deferred.push_back(afd); } diff --cc src/comm/ConnOpener.cc index de81be255b,0000000000..7374b8f1ee mode 100644,000000..100644 --- a/src/comm/ConnOpener.cc +++ b/src/comm/ConnOpener.cc @@@ -1,326 -1,0 +1,327 @@@ +/* + * DEBUG: section 05 Socket Connection Opener + */ + +#include "config.h" +//#include "base/TextException.h" +#include "comm/ConnOpener.h" +#include "comm/Connection.h" ++#include "comm/Loops.h" +#include "comm.h" +#include "fde.h" +#include "icmp/net_db.h" +#include "SquidTime.h" + +namespace Comm { + CBDATA_CLASS_INIT(ConnOpener); +}; + +Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) : + AsyncJob("Comm::ConnOpener"), + host_(NULL), + conn_(c), + callback_(handler), + totalTries_(0), + failRetries_(0), + connectTimeout_(ctimeout), + connectStart_(0) +{} + +Comm::ConnOpener::~ConnOpener() +{ + safe_free(host_); +} + +bool +Comm::ConnOpener::doneAll() const +{ + // is the conn_ to be opened still waiting? + if (conn_ == NULL) { + return AsyncJob::doneAll(); + } + + // is the callback still to be called? + if (callback_ == NULL || callback_->canceled()) { + return AsyncJob::doneAll(); + } + + return false; +} + +void +Comm::ConnOpener::swanSong() +{ + // cancel any event watchers + // done here to get the "swanSong" mention in cancel debugging. + if (calls_.earlyAbort_ != NULL) { + calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong"); + calls_.earlyAbort_ = NULL; + } + if (calls_.timeout_ != NULL) { + calls_.timeout_->cancel("Comm::ConnOpener::swanSong"); + calls_.timeout_ = NULL; + } + + // rollback what we can from the job state + if (conn_ != NULL && conn_->isOpen()) { + // drop any handlers now to save a lot of cycles later - commSetSelect(conn_->fd, COMM_SELECT_WRITE, NULL, NULL, 0); ++ Comm::SetSelect(conn_->fd, COMM_SELECT_WRITE, NULL, NULL, 0); + commUnsetConnTimeout(conn_); + // it never reached fully open, so abort the FD + conn_->close(); + } + + if (callback_ != NULL) { + if (callback_->canceled()) + callback_ = NULL; + else + // inform the still-waiting caller we are dying + doneConnecting(COMM_ERR_CONNECT, 0); + } + + AsyncJob::swanSong(); +} + +void +Comm::ConnOpener::setHost(const char * new_host) +{ + // unset and erase if already set. + if (host_ != NULL) + safe_free(host_); + + // set the new one if given. + if (new_host != NULL) + host_ = xstrdup(new_host); +} + +const char * +Comm::ConnOpener::getHost() const +{ + return host_; +} + +/** + * Connection attempt are completed. One way or the other. + * Pass the results back to the external handler. + * NP: on connection errors the connection close() must be called first. + */ +void +Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno) +{ + // only mark the address good/bad AFTER connect is finished. + if (host_ != NULL) { + if (xerrno == 0) + ipcacheMarkGoodAddr(host_, conn_->remote); + else { + ipcacheMarkBadAddr(host_, conn_->remote); +#if USE_ICMP + if (Config.onoff.test_reachability) + netdbDeleteAddrNetwork(conn_->remote); +#endif + } + } + + if (callback_ != NULL) { + typedef CommConnectCbParams Params; + Params ¶ms = GetCommParams(callback_); + params.conn = conn_; + params.flag = status; + params.xerrno = xerrno; + ScheduleCallHere(callback_); + callback_ = NULL; + } + + /* ensure cleared local state, we are done. */ + conn_ = NULL; +} + +void +Comm::ConnOpener::start() +{ + Must(conn_ != NULL); + + /* get a socket open ready for connecting with */ + if (!conn_->isOpen()) { +#if USE_IPV6 + /* outbound sockets have no need to be protocol agnostic. */ + if (conn_->remote.IsIPv4()) { + conn_->local.SetIPv4(); + } +#endif + conn_->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_); + if (!conn_->isOpen()) { + doneConnecting(COMM_ERR_CONNECT, 0); + return; + } + } + + typedef CommCbMemFunT abortDialer; + calls_.earlyAbort_ = JobCallback(5, 4, abortDialer, this, Comm::ConnOpener::earlyAbort); + comm_add_close_handler(conn_->fd, calls_.earlyAbort_); + + typedef CommCbMemFunT timeoutDialer; + calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout); + debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_); + commSetConnTimeout(conn_, connectTimeout_, calls_.timeout_); + + connectStart_ = squid_curtime; + connect(); +} + +void +Comm::ConnOpener::connected() +{ + /* + * stats.conn_open is used to account for the number of + * connections that we have open to the peer, so we can limit + * based on the max-conn option. We need to increment here, + * even if the connection may fail. + */ + if (conn_->getPeer()) + conn_->getPeer()->stats.conn_open++; + + lookupLocalAddress(); + + /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to + * indicate the state of a raw fd object being passed around. + * Also, legacy code still depends on comm_local_port() with no access to Comm::Connection + * when those are done comm_local_port can become one of our member functions to do the below. + */ + fd_table[conn_->fd].flags.open = 1; + fd_table[conn_->fd].local_addr = conn_->local; +} + +/** Make an FD connection attempt. + * Handles the case(s) when a partially setup connection gets closed early. + */ +void +Comm::ConnOpener::connect() +{ + Must(conn_ != NULL); + + // our parent Jobs signal abort by cancelling their callbacks. + if (callback_ == NULL || callback_->canceled()) + return; + + totalTries_++; + + switch (comm_connect_addr(conn_->fd, conn_->remote) ) { + + case COMM_INPROGRESS: + // check for timeout FIRST. + if (squid_curtime - connectStart_ > connectTimeout_) { + debugs(5, 5, HERE << conn_ << ": * - ERR took too long already."); + calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out"); + calls_.earlyAbort_ = NULL; + conn_->close(); + doneConnecting(COMM_TIMEOUT, errno); + return; + } else { + debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS"); - commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, this, 0); ++ Comm::SetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, this, 0); + } + break; + + case COMM_OK: + debugs(5, 5, HERE << conn_ << ": COMM_OK - connected"); + connected(); + doneConnecting(COMM_OK, 0); + break; + + default: + failRetries_++; + + // check for timeout FIRST. + if(squid_curtime - connectStart_ > connectTimeout_) { + debugs(5, 5, HERE << conn_ << ": * - ERR took too long to receive response."); + calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out"); + calls_.earlyAbort_ = NULL; + conn_->close(); + doneConnecting(COMM_TIMEOUT, errno); + } else if (failRetries_ < Config.connect_retries) { + debugs(5, 5, HERE << conn_ << ": * - try again"); + eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, this, 0.05, 0); + return; + } else { + // send ERROR back to the upper layer. + debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already."); + calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed"); + calls_.earlyAbort_ = NULL; + conn_->close(); + doneConnecting(COMM_ERR_CONNECT, errno); + } + } +} + +/** + * Lookup local-end address and port of the TCP link just opened. + * This ensure the connection local details are set correctly + */ +void +Comm::ConnOpener::lookupLocalAddress() +{ + struct addrinfo *addr = NULL; + conn_->local.InitAddrInfo(addr); + + if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) { + debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror()); + conn_->local.FreeAddrInfo(addr); + return; + } + + conn_->local = *addr; + conn_->local.FreeAddrInfo(addr); + debugs(5, 6, HERE << conn_); +} + +/** Abort connection attempt. + * Handles the case(s) when a partially setup connection gets closed early. + */ +void +Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io) +{ + debugs(5, 3, HERE << io.conn); + doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better? +} + +/** + * Handles the case(s) when a partially setup connection gets timed out. + * NP: When commSetConnTimeout accepts generic CommCommonCbParams this can die. + */ +void +Comm::ConnOpener::timeout(const CommTimeoutCbParams &) +{ + connect(); +} + +/* Legacy Wrapper for the retry event after COMM_INPROGRESS - * XXX: As soon as comm commSetSelect() accepts Async calls we can use a ConnOpener::connect call ++ * XXX: As soon as Comm::SetSelect() accepts Async calls we can use a ConnOpener::connect call + */ +void +Comm::ConnOpener::InProgressConnectRetry(int fd, void *data) +{ + ConnOpener *cs = static_cast(data); + assert(cs); + + // Ew. we are now outside the all AsyncJob protections. + // get back inside by scheduling another call... + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect); + ScheduleCallHere(call); +} + +/* Legacy Wrapper for the retry event with small delay after errors. + * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call + */ +void +Comm::ConnOpener::DelayedConnectRetry(void *data) +{ + ConnOpener *cs = static_cast(data); + assert(cs); + + // Ew. we are now outside the all AsyncJob protections. + // get back inside by scheduling another call... + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect); + ScheduleCallHere(call); +} diff --cc src/comm/IoCallback.cc index 2e9f9c927d,4026737754..a862cd5426 --- a/src/comm/IoCallback.cc +++ b/src/comm/IoCallback.cc @@@ -1,7 -1,7 +1,8 @@@ #include "config.h" #include "ClientInfo.h" +#include "comm/Connection.h" #include "comm/IoCallback.h" + #include "comm/Loops.h" #include "comm/Write.h" #include "CommCalls.h" #include "fde.h" @@@ -71,7 -66,7 +72,7 @@@ Comm::IoCallback::selectOrQueueWrite( } #endif - commSetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0); - SetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0); ++ SetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0); } void diff --cc src/comm/IoCallback.h index 85d1b3f2ad,1d2b770672..ab21066154 --- a/src/comm/IoCallback.h +++ b/src/comm/IoCallback.h @@@ -1,10 -1,8 +1,9 @@@ #ifndef _SQUID_COMM_IOCALLBACK_H #define _SQUID_COMM_IOCALLBACK_H - #include "config.h" #include "base/AsyncCall.h" #include "comm_err_t.h" +#include "comm/forward.h" namespace Comm { diff --cc src/comm/Makefile.am index 1a3a7e3571,cdb8f5032a..0ba83c14ad --- a/src/comm/Makefile.am +++ b/src/comm/Makefile.am @@@ -7,17 -7,18 +7,22 @@@ noinst_LTLIBRARIES = libcomm.l libcomm_la_SOURCES= \ AcceptLimiter.cc \ AcceptLimiter.h \ - ConnAcceptor.cc \ - ConnAcceptor.h \ - \ + ConnOpener.cc \ + ConnOpener.h \ - \ + Connection.cc \ + Connection.h \ + forward.h \ + IoCallback.cc \ + IoCallback.h \ + Loops.h \ + ModDevPoll.cc \ + ModEpoll.cc \ + ModKqueue.cc \ + ModPoll.cc \ + ModSelect.cc \ + ModSelectWin32.cc \ + TcpAcceptor.cc \ + TcpAcceptor.h \ - \ - IoCallback.cc \ - IoCallback.h \ Write.cc \ Write.h \ \ diff --cc src/comm/TcpAcceptor.cc index 4157094249,db0275af0d..6dd7dc68cc --- a/src/comm/TcpAcceptor.cc +++ b/src/comm/TcpAcceptor.cc @@@ -37,44 -37,32 +37,30 @@@ #include "CommCalls.h" #include "comm/AcceptLimiter.h" #include "comm/comm_internal.h" +#include "comm/Connection.h" - #include "comm/ConnAcceptor.h" + #include "comm/Loops.h" + #include "comm/TcpAcceptor.h" -#include "ConnectionDetail.h" #include "fde.h" #include "protos.h" #include "SquidTime.h" - namespace Comm { - CBDATA_CLASS_INIT(ConnAcceptor); + namespace Comm + { + CBDATA_CLASS_INIT(TcpAcceptor); }; - Comm::ConnAcceptor::ConnAcceptor(const Comm::ConnectionPointer &newConn, const char *note, const Subscription::Pointer &aSub) : - AsyncJob("Comm::ConnAcceptor"), -Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, - const char *note, const Subscription::Pointer &aSub) : ++Comm::TcpAcceptor::TcpAcceptor(const Comm::ConnectionPointer &newConn, const char *note, const Subscription::Pointer &aSub) : + AsyncJob("Comm::TcpAcceptor"), errcode(0), - fd(listenFd), isLimited(0), theCallSub(aSub), - local_addr(laddr) + conn(newConn) - { - assert(newConn != NULL); - - /* open the conn if its not already open */ - if (!IsConnOpen(conn)) { - conn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note); - errcode = errno; - - if (!conn->isOpen()) { - debugs(5, DBG_CRITICAL, HERE << "comm_open failed: " << conn << " error: " << errcode); - conn = NULL; - return; - } - debugs(9, 3, HERE << "Unconnected data socket created on " << conn); - } - assert(IsConnOpen(newConn)); - } + {} void - Comm::ConnAcceptor::subscribe(const Subscription::Pointer &aSub) + Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub) { - debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << aSub); + debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << aSub); unsubscribe("subscription change"); theCallSub = aSub; } @@@ -87,24 -75,24 +73,24 @@@ Comm::TcpAcceptor::unsubscribe(const ch } void - Comm::ConnAcceptor::start() + Comm::TcpAcceptor::start() { - debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << theCallSub); + debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << theCallSub); - Must(isOpen(fd)); + Must(IsConnOpen(conn)); setListen(); // if no error so far start accepting connections. if (errcode == 0) - commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); - SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); ++ SetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); } bool - Comm::ConnAcceptor::doneAll() const + Comm::TcpAcceptor::doneAll() const { // stop when FD is closed - if (!isOpen(fd)) { + if (!IsConnOpen(conn)) { return AsyncJob::doneAll(); } @@@ -127,6 -115,23 +113,26 @@@ Comm::TcpAcceptor::swanSong( AsyncJob::swanSong(); } + const char * + Comm::TcpAcceptor::status() const + { ++ if (conn == NULL) ++ return "[nil connection]"; ++ + static char ipbuf[MAX_IPSTRLEN] = {'\0'}; + if (ipbuf[0] == '\0') - local_addr.ToHostname(ipbuf, MAX_IPSTRLEN); ++ conn->local.ToHostname(ipbuf, MAX_IPSTRLEN); + + static MemBuf buf; + buf.reset(); - buf.Printf(" FD %d, %s",fd, ipbuf); ++ buf.Printf(" FD %d, %s",conn->fd, ipbuf); + + const char *jobStatus = AsyncJob::status(); + buf.append(jobStatus, strlen(jobStatus)); + + return buf.content(); + } + /** * New-style listen and accept routines * @@@ -135,11 -140,11 +141,11 @@@ * accept()ed some time later. */ void - Comm::ConnAcceptor::setListen() + Comm::TcpAcceptor::setListen() { errcode = 0; // reset local errno copy. - if (listen(fd, Squid_MaxFD >> 2) < 0) { + if (listen(conn->fd, Squid_MaxFD >> 2) < 0) { - debugs(50, DBG_CRITICAL, "ERROR: listen(" << conn << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); + debugs(50, DBG_CRITICAL, "ERROR: listen(" << status() << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror()); errcode = errno; return; } @@@ -222,44 -227,45 +228,42 @@@ Comm::TcpAcceptor::acceptOne( */ /* Accept a new connection */ - ConnectionDetail newConnDetails; - int newFd = -1; - const comm_err_t flag = oldAccept(newConnDetails, &newFd); + ConnectionPointer newConnDetails = new Connection(); - comm_err_t status = oldAccept(newConnDetails); ++ const comm_err_t flag = oldAccept(newConnDetails); /* Check for errors */ - if (!isOpen(newFd)) { + if (!newConnDetails->isOpen()) { - if (status == COMM_NOMESSAGE) { + if (flag == COMM_NOMESSAGE) { /* register interest again */ - debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub); - SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0); + debugs(5, 5, HERE << "try later: " << conn << " handler Subscription: " << theCallSub); - commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); ++ SetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0); return; } // A non-recoverable error; notify the caller */ - debugs(5, 5, HERE << "non-recoverable error: " << conn << " handler Subscription: " << theCallSub); - notify(status, newConnDetails); + debugs(5, 5, HERE << "non-recoverable error:" << status() << " handler Subscription: " << theCallSub); - notify(flag, newConnDetails, newFd); ++ notify(flag, newConnDetails); mustStop("Listener socket closed"); return; } - debugs(5, 5, HERE << "Listener: FD " << fd << - " accepted new connection from " << newConnDetails.peer << + debugs(5, 5, HERE << "Listener: " << conn << + " accepted new connection " << newConnDetails << " handler Subscription: " << theCallSub); - notify(status, newConnDetails); - notify(flag, newConnDetails, newFd); ++ notify(flag, newConnDetails); } void - Comm::ConnAcceptor::acceptNext() + Comm::TcpAcceptor::acceptNext() { - Must(isOpen(fd)); - debugs(5, 2, HERE << "connection on FD " << fd); + Must(IsConnOpen(conn)); + debugs(5, 2, HERE << "connection on " << conn); acceptOne(); } --// XXX: obsolete comment? --// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback(). void - Comm::ConnAcceptor::notify(comm_err_t flag, const Comm::ConnectionPointer &newConnDetails) -Comm::TcpAcceptor::notify(const comm_err_t flag, const ConnectionDetail &connDetails, int newFd) const ++Comm::TcpAcceptor::notify(const comm_err_t flag, const Comm::ConnectionPointer &newConnDetails) const { // listener socket handlers just abandon the port with COMM_ERR_CLOSING // it should only happen when this object is deleted... @@@ -288,7 -295,7 +292,7 @@@ * Or if this client has too many connections already. */ comm_err_t - Comm::ConnAcceptor::oldAccept(Comm::ConnectionPointer &details) -Comm::TcpAcceptor::oldAccept(ConnectionDetail &details, int *newFd) ++Comm::TcpAcceptor::oldAccept(Comm::ConnectionPointer &details) { PROF_start(comm_accept); statCounter.syscalls.sock.accepts++; diff --cc src/comm/TcpAcceptor.h index 4f8652d7fe,1c3a12fb03..ef3d2129de --- a/src/comm/TcpAcceptor.h +++ b/src/comm/TcpAcceptor.h @@@ -5,7 -5,8 +5,9 @@@ #include "base/Subscription.h" #include "CommCalls.h" #include "comm_err_t.h" +#include "comm/forward.h" + #include "comm/TcpAcceptor.h" + #include "ip/Address.h" #if HAVE_MAP #include @@@ -34,13 -35,16 +36,15 @@@ private virtual void start(); virtual bool doneAll() const; virtual void swanSong(); + virtual const char *status() const; + + TcpAcceptor(const TcpAcceptor &); // not implemented. public: - ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub); - ConnAcceptor(const ConnAcceptor &r); // not implemented. - TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags, - const char *note, const Subscription::Pointer &aSub); ++ TcpAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub); /** Subscribe a handler to receive calls back about new connections. - * Replaces any existing subscribed handler. + * Unsubscribes any existing subscribed handler. */ void subscribe(const Subscription::Pointer &aSub); @@@ -56,21 -60,26 +60,22 @@@ void acceptNext(); /// Call the subscribed callback handler with details about a new connection. - void notify(comm_err_t flag, const Comm::ConnectionPointer &details); - void notify(const comm_err_t flags, const ConnectionDetail &newConnDetails, const int newFd) const; ++ void notify(const comm_err_t flag, const Comm::ConnectionPointer &details) const; /// errno code of the last accept() or listen() action if one occurred. int errcode; - private: - /// conn being listened on for new connections - /// Reserved for read-only use. - // NP: public only until we can hide it behind connection handles - int fd; - + protected: friend class AcceptLimiter; int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue. + + private: Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events. - /// IP Address and port being listened on - Ip::Address local_addr; + /// conn being listened on for new connections + /// Reserved for read-only use. + ConnectionPointer conn; - private: /// Method to test if there are enough file descriptors to open a new client connection /// if not the accept() will be postponed static bool okToAccept(); @@@ -79,10 -88,10 +84,10 @@@ static void doAccept(int fd, void *data); void acceptOne(); - comm_err_t oldAccept(ConnectionDetail &newConnDetails, int *fd); + comm_err_t oldAccept(Comm::ConnectionPointer &details); void setListen(); - CBDATA_CLASS2(ConnAcceptor); + CBDATA_CLASS2(TcpAcceptor); }; } // namespace Comm diff --cc src/dns_internal.cc index ae460a5276,f859e9dcb7..c0df7f66bc --- a/src/dns_internal.cc +++ b/src/dns_internal.cc @@@ -33,12 -33,12 +33,13 @@@ */ #include "squid.h" +#include "base/InstanceId.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" - #include "comm/Write.h" #include "comm.h" + #include "comm/Loops.h" + #include "comm/Write.h" #include "event.h" -#include "SquidTime.h" -#include "Store.h" #include "fde.h" #include "ip/tools.h" #include "MemBuf.h" diff --cc src/errorpage.h index d634eb4a00,e3ea32cfa3..c50887a072 --- a/src/errorpage.h +++ b/src/errorpage.h @@@ -37,8 -37,10 +37,11 @@@ #include "squid.h" #include "auth/UserRequest.h" #include "cbdata.h" +#include "comm/forward.h" #include "ip/Address.h" + #if USE_SSL + #include "ssl/ErrorDetail.h" + #endif /** \defgroup ErrorPageAPI Error Pages API diff --cc src/forward.cc index ead2e1fc0e,930a110629..c6f6022296 --- a/src/forward.cc +++ b/src/forward.cc @@@ -35,9 -35,8 +35,10 @@@ #include "acl/FilledChecklist.h" #include "acl/Gadgets.h" #include "CacheManager.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" +#include "CommCalls.h" + #include "comm/Loops.h" #include "event.h" #include "errorpage.h" #include "fde.h" @@@ -57,9 -54,10 +58,10 @@@ #include "mgr/Registration.h" #if USE_SSL #include "ssl/support.h" + #include "ssl/ErrorDetail.h" #endif -static PSC fwdStartCompleteWrapper; +static PSC fwdPeerSelectionCompleteWrapper; static PF fwdServerClosedWrapper; #if USE_SSL static PF fwdNegotiateSSLWrapper; @@@ -604,13 -606,22 +597,21 @@@ FwdState::negotiateSSL(int fd anErr->xerrno = EACCES; #endif + Ssl::ErrorDetail *errFromFailure = (Ssl::ErrorDetail *)SSL_get_ex_data(ssl, ssl_ex_index_ssl_error_detail); + if (errFromFailure != NULL) { + // The errFromFailure is attached to the ssl object + // and will be released when ssl object destroyed. + // Copy errFromFailure to a new Ssl::ErrorDetail object + anErr->detail = new Ssl::ErrorDetail(*errFromFailure); + } + fail(anErr); - if (fs->_peer) { - peerConnectFailed(fs->_peer); - fs->_peer->stats.conn_open--; + if (serverConnection()->getPeer()) { + peerConnectFailed(serverConnection()->getPeer()); } - comm_close(fd); + serverConn->close(); return; } } @@@ -1001,7 -1125,7 +1002,7 @@@ FwdState::dispatch( * FwdState::reforward * * returns TRUE if the transaction SHOULD be re-forwarded to the -- * next choice in the FwdServers list. This method is called when ++ * next choice in the serverDestinations list. This method is called when * server-side communication completes normally, or experiences * some error after receiving the end of HTTP headers. */ @@@ -1133,17 -1266,7 +1134,6 @@@ FwdState::pconnPush(Comm::ConnectionPoi void FwdState::initModule() { - #if WIP_FWD_LOG - - if (logfile) - (void) 0; - else if (NULL == Config.Log.forward) - (void) 0; - else - logfile = logfileOpen(Config.Log.forward, 0, 1); - - #endif - - memDataInit(MEM_FWD_SERVER, "FwdServer", sizeof(FwdServer), 0); RegisterWithCacheManager(); } diff --cc src/forward.h index f7c251f964,0324c8f4d2..1bc1cddd19 --- a/src/forward.h +++ b/src/forward.h @@@ -91,17 -95,10 +84,14 @@@ private time_t start_t; int n_tries; int origin_tries; - #if WIP_FWD_LOG - http_status last_status; - #endif + // AsyncCalls which we set and may need cancelling. + struct { + AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. + } calls; + struct { unsigned int dont_retry:1; - unsigned int ftp_pasv_failed:1; unsigned int forward_completed:1; } flags; diff --cc src/ftp.cc index 9aae0fc162,752c2ebf60..d6a55fb29c --- a/src/ftp.cc +++ b/src/ftp.cc @@@ -34,10 -34,11 +34,11 @@@ #include "squid.h" #include "comm.h" +#include "comm/ConnOpener.h" - #include "comm/ConnAcceptor.h" + #include "CommCalls.h" + #include "comm/TcpAcceptor.h" #include "comm/Write.h" #include "compat/strtoll.h" -#include "ConnectionDetail.h" #include "errorpage.h" #include "fde.h" #include "forward.h" @@@ -462,10 -457,10 +464,11 @@@ FtpStateData::ctrlClosed(const CommClos void FtpStateData::dataClosed(const CommCloseCbParams &io) { + debugs(9, 4, HERE); - if (data.fd >= 0) { - comm_close(data.fd); + if (data.listenConn != NULL) { + data.listenConn->close(); + data.listenConn = NULL; - data.conn = NULL; + // NP clear() does the: data.fd = -1; } data.clear(); failed(ERR_FTP_FAILURE, 0); @@@ -634,15 -622,33 +637,31 @@@ FtpStateData::switchTimeoutToDataChanne } void -FtpStateData::listenForDataChannel(const int fd, const char *note) +FtpStateData::listenForDataChannel(const Comm::ConnectionPointer &conn, const char *note) { - data.listenConn = conn; - assert(data.fd < 0); ++ assert(!Comm::IsConnOpen(data.conn)); typedef CommCbMemFunT AcceptDialer; typedef AsyncCallT AcceptCall; - RefCount call = (AcceptCall*)JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection); + RefCount call = static_cast(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection)); Subscription::Pointer sub = new CallSubscription(call); - AsyncJob::Start(new Comm::ConnAcceptor(data.listenConn, note, sub)); + + /* open the conn if its not already open */ - int newFd = fd; - if (newFd < 0) { - newFd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, data.local, data.flags, note); - if (newFd < 0) { - debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" << data.local << " error: " << errno); ++ if (!Comm::IsConnOpen(conn)) { ++ conn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note); ++ if (!Comm::IsConnOpen(conn)) { ++ debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" << conn->local << " error: " << errno); + return; + } - debugs(9, 3, HERE << "Unconnected data socket created on FD " << newFd << ", " << data.local); ++ debugs(9, 3, HERE << "Unconnected data socket created on " << conn); + } + - assert(newFd >= 0); - Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(newFd, data.local, data.flags, note, sub); - AsyncJob::Start(tmp); ++ assert(Comm::IsConnOpen(conn)); ++ AsyncJob::Start(new Comm::TcpAcceptor(conn, note, sub)); + + // Ensure we have a copy of the FD opened for listening and a close handler on it. - data.opened(newFd, dataCloser()); ++ data.opened(conn, dataCloser()); + switchTimeoutToDataChannel(); } void @@@ -2722,19 -2778,32 +2757,23 @@@ FtpStateData::ftpPasvCallback(const Com } /// \ingroup ServerProtocolFTPInternal -static int +static void ftpOpenListenSocket(FtpStateData * ftpState, int fallback) { - struct addrinfo *AI = NULL; - int x = 0; - /// Close old data channels, if any. We may open a new one below. - ftpState->data.close(); - if ((ftpState->data.flags & COMM_REUSEADDR)) ++ if ((ftpState->data.conn->flags & COMM_REUSEADDR)) + // NP: in fact it points to the control channel. just clear it. + ftpState->data.clear(); + else + ftpState->data.close(); + ftpState->data.host = NULL; /* * Set up a listen socket on the same local address as the * control connection. */ - ftpState->data.listenConn = new Comm::Connection; - ftpState->data.listenConn->local = ftpState->ctrl.conn->local; - ftpState->data.local.InitAddrInfo(AI); - x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen); - ftpState->data.local = *AI; - ftpState->data.local.FreeAddrInfo(AI); - - if (x) { - debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror()); - return -1; - } ++ Comm::ConnectionPointer temp = new Comm::Connection; ++ temp->local = ftpState->ctrl.conn->local; /* * REUSEADDR is needed in fallback mode, since the same port is @@@ -2742,15 -2811,17 +2781,15 @@@ */ if (fallback) { int on = 1; - setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); - ftpState->ctrl.flags |= COMM_REUSEADDR; - ftpState->data.flags |= COMM_REUSEADDR; + setsockopt(ftpState->ctrl.conn->fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)); + ftpState->ctrl.conn->flags |= COMM_REUSEADDR; - ftpState->data.listenConn->flags |= COMM_REUSEADDR; ++ temp->flags |= COMM_REUSEADDR; } else { /* if not running in fallback mode a new port needs to be retrieved */ - ftpState->data.listenConn->local.SetPort(0); - ftpState->data.local.SetPort(0); - ftpState->data.flags = COMM_NONBLOCKING; ++ temp->local.SetPort(0); } - ftpState->listenForDataChannel(ftpState->data.listenConn, ftpState->entry->url()); - ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url()); - return ftpState->data.fd; ++ ftpState->listenForDataChannel(temp, ftpState->entry->url()); } /// \ingroup ServerProtocolFTPInternal @@@ -2827,22 -2902,42 +2864,32 @@@ ftpSendEPRT(FtpStateData * ftpState return; } + if (!Config.Ftp.eprt) { + /* Disabled. Switch immediately to attempting old PORT command. */ + debugs(9, 3, "EPRT disabled by local administrator"); + ftpSendPORT(ftpState); + return; + } + - int fd; - Ip::Address addr; - struct addrinfo *AI = NULL; - char buf[MAX_IPSTRLEN]; - debugs(9, 3, HERE); ftpState->flags.pasv_supported = 0; - fd = ftpOpenListenSocket(ftpState, 0); - debugs(9, 3, "Listening for FTP data connection with FD " << fd); - - Ip::Address::InitAddrInfo(AI); - - if (getsockname(fd, AI->ai_addr, &AI->ai_addrlen)) { - Ip::Address::FreeAddrInfo(AI); - debugs(9, DBG_CRITICAL, HERE << "getsockname(" << fd << ",..): " << xstrerror()); + ftpOpenListenSocket(ftpState, 0); - if (!Comm::IsConnOpen(ftpState->data.listenConn)) { ++ debugs(9, 3, "Listening for FTP data connection with FD " << ftpState->data.conn); ++ if (!Comm::IsConnOpen(ftpState->data.conn)) { /* XXX Need to set error message */ ftpFail(ftpState); return; } - addr = *AI; ++ char buf[MAX_IPSTRLEN]; + /* RFC 2428 defines EPRT as IPv6 equivalent to IPv4 PORT command. */ /* Which can be used by EITHER protocol. */ - snprintf(cbuf, 1024, "EPRT |%d|%s|%d|\r\n", - ( addr.IsIPv6() ? 2 : 1 ), - addr.NtoA(buf,MAX_IPSTRLEN), - addr.GetPort() ); + snprintf(cbuf, CTRL_BUFLEN, "EPRT |%d|%s|%d|\r\n", + ( ftpState->data.listenConn->local.IsIPv6() ? 2 : 1 ), + ftpState->data.listenConn->local.NtoA(buf,MAX_IPSTRLEN), + ftpState->data.listenConn->local.GetPort() ); ftpState->writeCommand(cbuf); ftpState->state = SENT_EPRT; @@@ -2871,11 -2968,9 +2918,10 @@@ ftpReadEPRT(FtpStateData * ftpState * \param io comm accept(2) callback parameters */ -void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) +void +FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io) { - char ntoapeer[MAX_IPSTRLEN]; - debugs(9, 3, "ftpAcceptDataConnection"); + debugs(9, 3, HERE); if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { abortTransaction("entry aborted when accepting data conn"); @@@ -2899,6 -2977,20 +2945,20 @@@ return; } + if (io.flag != COMM_OK) { + data.close(); + debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno)); + /** \todo Need to send error message on control channel*/ + ftpFail(this); + return; + } + + /* data listening conn is no longer even open. abort. */ - if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) { ++ if (!Comm::IsConnOpen(data.conn)) { + data.clear(); // ensure that it's cleared and not just closed. + return; + } + /** \par * When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being * made by the remote client which is connected to the FTP control socket. @@@ -2906,31 -2998,35 +2966,30 @@@ * This prevents third-party hacks, but also third-party load balancing handshakes. */ if (Config.Ftp.sanitycheck) { - io.conn->remote.NtoA(ntoapeer,MAX_IPSTRLEN); - char ntoapeer[MAX_IPSTRLEN]; - io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN); -- - if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 && - strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) { + // accept if either our data or ctrl connection is talking to this remote peer. - if (data.listenConn->remote != io.conn->remote && ctrl.conn->remote != io.conn->remote) { ++ if (data.conn->remote != io.conn->remote && ctrl.conn->remote != io.conn->remote) { debugs(9, DBG_IMPORTANT, "FTP data connection from unexpected server (" << - io.details.peer << "), expecting " << - fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr); + io.conn->remote << "), expecting " << - data.listenConn->remote << " or " << ctrl.conn->remote); ++ data.conn->remote << " or " << ctrl.conn->remote); + + /* close the bad sources connection down ASAP. */ - comm_close(io.nfd); ++ io.conn->close(); /* drop the bad connection (io) by ignoring the attempt. */ return; } } - /**\par - * Replace the Listening socket with the accepted data socket */ + /** On COMM_OK start using the accepted data socket and discard the temporary listen socket. */ data.close(); - data.opened(io.nfd, dataCloser()); - data.port = io.details.peer.GetPort(); - data.host = xstrdup(fd_table[io.nfd].ipaddr); + data.opened(io.conn, dataCloser()); + io.conn->remote.NtoA(data.host,SQUIDHOSTNAMELEN); - data.listenConn->close(); - data.listenConn = NULL; - debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " << - "FD " << io.nfd << " to " << io.details.peer << " FD table says: " << - "ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " << - "data-peer= " << fd_table[data.fd].ipaddr); + debugs(9, 3, HERE << "Connected data socket on " << + io.conn << ". FD table says: " << + "ctrl-peer= " << fd_table[ctrl.conn->fd].ipaddr << ", " << + "data-peer= " << fd_table[data.conn->fd].ipaddr); assert(haveControlChannel("ftpAcceptDataConnection")); assert(ctrl.message == NULL); @@@ -3135,19 -3231,17 +3194,17 @@@ ftpReadList(FtpStateData * ftpState int code = ftpState->ctrl.replycode; debugs(9, 3, HERE); - if (code == 125 || (code == 150 && ftpState->data.host)) { + if (code == 125 || (code == 150 && Comm::IsConnOpen(ftpState->data.conn))) { - debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.conn->remote); /* Begin data transfer */ - debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")"); ++ debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.conn->remote << " (" << ftpState->data.conn->local << ")"); ftpState->switchTimeoutToDataChannel(); ftpState->maybeReadVirginBody(); ftpState->state = READING_DATA; return; } else if (code == 150) { - debugs(9, 3, HERE << "accept data channel from " << ftpState->ctrl.conn->remote); - ftpState->switchTimeoutToDataChannel(); - /* Accept data channel */ - debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")"); - ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); ++ debugs(9, 3, HERE << "accept data channel from " << ftpState->data.conn->remote << " (" << ftpState->data.conn->local << ")"); + ftpState->listenForDataChannel(ftpState->data.conn, ftpState->data.host); return; } else if (!ftpState->flags.tried_nlst && code > 300) { ftpSendNlst(ftpState); @@@ -3188,8 -3282,7 +3245,7 @@@ ftpReadRetr(FtpStateData * ftpState ftpState->state = READING_DATA; } else if (code == 150) { /* Accept data channel */ - ftpState->switchTimeoutToDataChannel(); - ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host); + ftpState->listenForDataChannel(ftpState->data.conn, ftpState->data.host); } else if (code >= 300) { if (!ftpState->flags.try_slash_hack) { /* Try this as a directory missing trailing slash... */ @@@ -3815,9 -3906,9 +3873,9 @@@ voi FtpStateData::abortTransaction(const char *reason) { debugs(9, 3, HERE << "aborting transaction for " << reason << - "; FD " << ctrl.conn->fd << ", Data FD " << data.conn->fd << ", this " << this); - "; FD " << ctrl.fd << ", Data FD " << data.fd << ", this " << this); - if (ctrl.fd >= 0) { - comm_close(ctrl.fd); ++ "; FD " << (ctrl.conn!=NULL?ctrl.conn->fd:-1) << ", Data FD " << (data.conn!=NULL?data.conn->fd:-1) << ", this " << this); + if (Comm::IsConnOpen(ctrl.conn)) { + ctrl.conn->close(); return; } @@@ -3853,19 -3951,17 +3911,14 @@@ voi FtpChannel::close() { // channels with active listeners will be closed when the listener handler dies. - if (listenConn != NULL) { - listenConn->close(); - listenConn = NULL; - comm_remove_close_handler(conn->fd, closer); - closer = NULL; - } else if (Comm::IsConnOpen(conn)) { - if (fd >= 0) { - if (closer != NULL) { - comm_remove_close_handler(fd, closer); - closer = NULL; - } - comm_close(fd); // we do not expect to be called back - fd = -1; ++ if (Comm::IsConnOpen(conn)) { + comm_remove_close_handler(conn->fd, closer); + closer = NULL; + conn->close(); // we do not expect to be called back } + conn = NULL; } -/// just resets fd and close handler void FtpChannel::clear() { diff --cc src/helper.cc index ffa19ae2de,6c313cbd1e..c3165f02b4 --- a/src/helper.cc +++ b/src/helper.cc @@@ -33,11 -33,9 +33,12 @@@ */ #include "squid.h" +#include "comm.h" +#include "comm/Connection.h" #include "comm/Write.h" #include "helper.h" + #include "log/Gadgets.h" +#include "MemBuf.h" #include "SquidMath.h" #include "SquidTime.h" #include "Store.h" diff --cc src/htcp.cc index 725bdebdab,0f6b307197..0ab874690d --- a/src/htcp.cc +++ b/src/htcp.cc @@@ -1510,16 -1520,16 +1511,16 @@@ htcpInit(void Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, - incomingAddr, - COMM_NONBLOCKING, + htcpIncomingConn, - Ipc::fdnInHtcpSocket, call, Subscription::Pointer()); + Ipc::fdnInHtcpSocket, call); if (!Config.Addrs.udp_outgoing.IsNoAddr()) { - Ip::Address outgoingAddr = Config.Addrs.udp_outgoing; - outgoingAddr.SetPort(Config.Port.htcp); + htcpOutgoingConn = new Comm::Connection; + htcpOutgoingConn->local = Config.Addrs.udp_outgoing; + htcpOutgoingConn->local.SetPort(Config.Port.htcp); - if (!Ip::EnableIpv6 && !outgoingAddr.SetIPv4()) { - debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoingAddr << " is not an IPv4 address."); + if (!Ip::EnableIpv6 && !htcpOutgoingConn->local.SetIPv4()) { + debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << htcpOutgoingConn->local << " is not an IPv4 address."); fatal("HTCP port cannot be opened."); } /* split-stack for now requires default IPv4-only HTCP */ @@@ -1528,15 -1538,21 +1529,15 @@@ } enter_suid(); - htcpOutSocket = comm_open_listener(SOCK_DGRAM, - IPPROTO_UDP, - outgoingAddr, - COMM_NONBLOCKING, - "Outgoing HTCP Socket"); + comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, htcpOutgoingConn, "Outgoing HTCP Socket"); leave_suid(); - if (htcpOutSocket < 0) + if (!Comm::IsConnOpen(htcpOutgoingConn)) fatal("Cannot open Outgoing HTCP Socket"); - commSetSelect(htcpOutgoingConn->fd, COMM_SELECT_READ, htcpRecv, NULL, 0); - Comm::SetSelect(htcpOutSocket, COMM_SELECT_READ, htcpRecv, NULL, 0); ++ Comm::SetSelect(htcpOutgoingConn->fd, COMM_SELECT_READ, htcpRecv, NULL, 0); - debugs(31, 1, "Outgoing HTCP messages on port " << Config.Port.htcp << ", FD " << htcpOutSocket << "."); - - fd_note(htcpInSocket, "Incoming HTCP socket"); + debugs(31, DBG_IMPORTANT, "Sending HTCP messages from " << htcpOutgoingConn->local); } if (!htcpDetailPool) { @@@ -1545,19 -1561,19 +1546,19 @@@ } static void -htcpIncomingConnectionOpened(int fd, int errNo) +htcpIncomingConnectionOpened(int) { - htcpInSocket = fd; - - if (htcpInSocket < 0) + if (!Comm::IsConnOpen(htcpIncomingConn)) fatal("Cannot open HTCP Socket"); - commSetSelect(htcpIncomingConn->fd, COMM_SELECT_READ, htcpRecv, NULL, 0); - Comm::SetSelect(htcpInSocket, COMM_SELECT_READ, htcpRecv, NULL, 0); ++ Comm::SetSelect(htcpIncomingConn->fd, COMM_SELECT_READ, htcpRecv, NULL, 0); - debugs(31, 1, "Accepting HTCP messages on port " << Config.Port.htcp << ", FD " << htcpInSocket << "."); + debugs(31, DBG_CRITICAL, "Accepting HTCP messages on " << htcpIncomingConn->local); - if (Config.Addrs.udp_outgoing.IsNoAddr()) - htcpOutSocket = htcpInSocket; + if (Config.Addrs.udp_outgoing.IsNoAddr()) { + htcpOutgoingConn = htcpIncomingConn; + debugs(31, DBG_IMPORTANT, "Sending HTCP messages from " << htcpOutgoingConn->local); + } } int @@@ -1712,9 -1732,9 +1713,9 @@@ htcpSocketShutdown(void /* XXX Don't we need this handler to read replies while shutting down? * I think there should be a separate hander for reading replies.. */ - assert(htcpOutSocket > -1); + assert(Comm::IsConnOpen(htcpOutgoingConn)); - commSetSelect(htcpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0); - Comm::SetSelect(htcpOutSocket, COMM_SELECT_READ, NULL, NULL, 0); ++ Comm::SetSelect(htcpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0); } void diff --cc src/icp_v2.cc index c59c13ce18,2f1d326517..2c6549adcc --- a/src/icp_v2.cc +++ b/src/icp_v2.cc @@@ -36,13 -36,18 +36,15 @@@ */ #include "squid.h" -#include "Store.h" +#include "AccessLogEntry.h" +#include "acl/Acl.h" +#include "acl/FilledChecklist.h" + #include "comm.h" + #include "comm/Loops.h" -#include "ICP.h" +#include "comm/Connection.h" #include "HttpRequest.h" -#include "acl/FilledChecklist.h" -#include "acl/Acl.h" -#include "AccessLogEntry.h" -#include "wordlist.h" -#include "SquidTime.h" -#include "SwapDir.h" #include "icmp/net_db.h" +#include "ICP.h" #include "ip/Address.h" #include "ip/tools.h" #include "ipc/StartListening.h" @@@ -712,34 -701,40 +714,34 @@@ icpConnectionsOpen(void Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, - addr, - COMM_NONBLOCKING, + icpIncomingConn, - Ipc::fdnInIcpSocket, call, Subscription::Pointer()); + Ipc::fdnInIcpSocket, call); - addr.SetEmpty(); // clear for next use. - addr = Config.Addrs.udp_outgoing; - if ( !addr.IsNoAddr() ) { - enter_suid(); - addr.SetPort(port); + if ( !Config.Addrs.udp_outgoing.IsNoAddr() ) { + icpOutgoingConn = new Comm::Connection; + icpOutgoingConn->local = Config.Addrs.udp_outgoing; + icpOutgoingConn->local.SetPort(port); - if (!Ip::EnableIpv6 && !addr.SetIPv4()) { - debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address."); + if (!Ip::EnableIpv6 && !icpOutgoingConn->local.SetIPv4()) { + debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address."); fatal("ICP port cannot be opened."); } /* split-stack for now requires default IPv4-only ICP */ - if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) { - addr.SetIPv4(); + if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.IsAnyAddr()) { + icpOutgoingConn->local.SetIPv4(); } - theOutIcpConnection = comm_open_listener(SOCK_DGRAM, - IPPROTO_UDP, - addr, - COMM_NONBLOCKING, - "ICP Port"); + enter_suid(); + comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port"); leave_suid(); - if (theOutIcpConnection < 0) + if (!Comm::IsConnOpen(icpOutgoingConn)) fatal("Cannot open Outgoing ICP Port"); - Comm::SetSelect(theOutIcpConnection, COMM_SELECT_READ, icpHandleUdp, NULL, 0); - - debugs(12, 1, "Outgoing ICP messages on port " << addr.GetPort() << ", FD " << theOutIcpConnection << "."); + debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local); - commSetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0); - fd_note(theOutIcpConnection, "Outgoing ICP socket"); ++ Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0); + fd_note(icpOutgoingConn->fd, "Outgoing ICP socket"); icpGetOutgoingIpAddress(); } } @@@ -761,36 -755,26 +763,36 @@@ icpGetOutgoingIpAddress( } static void -icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr) +icpIncomingConnectionOpened(int errNo) { - theInIcpConnection = fd; - - if (theInIcpConnection < 0) + if (!Comm::IsConnOpen(icpIncomingConn)) fatal("Cannot open ICP Port"); - commSetSelect(icpIncomingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0); - Comm::SetSelect(theInIcpConnection, COMM_SELECT_READ, icpHandleUdp, NULL, 0); ++ Comm::SetSelect(icpIncomingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0); for (const wordlist *s = Config.mcast_group_list; s; s = s->next) - ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); + ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the icpIncomingConn for mcastJoinGroups usage. - debugs(12, 1, "Accepting ICP messages at " << addr << ", FD " << theInIcpConnection << "."); + debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << icpIncomingConn->local); - fd_note(theInIcpConnection, "Incoming ICP socket"); + fd_note(icpIncomingConn->fd, "Incoming ICP port"); if (Config.Addrs.udp_outgoing.IsNoAddr()) { - theOutIcpConnection = theInIcpConnection; + icpOutgoingConn = icpIncomingConn; + debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local); icpGetOutgoingIpAddress(); } + + // Ensure that we have the IP address(es) to use for Host ID. + // The listening address is used as 'public' host ID which can be used to contact us + struct addrinfo *xai = NULL; + theIcpPublicHostID.InitAddrInfo(xai); // reset xai + if (getsockname(icpIncomingConn->fd, xai->ai_addr, &xai->ai_addrlen) < 0) + debugs(50, DBG_IMPORTANT, "ERROR: Unable to identify ICP host ID to use for " << icpIncomingConn + << ": getsockname: " << xstrerror()); + else + theIcpPublicHostID = *xai; + theIcpPublicHostID.FreeAddrInfo(xai); } /** @@@ -817,9 -806,9 +819,9 @@@ icpConnectionShutdown(void * to that specific interface. During shutdown, we must * disable reading on the outgoing socket. */ - assert(theOutIcpConnection > -1); + assert(Comm::IsConnOpen(icpOutgoingConn)); - commSetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0); - Comm::SetSelect(theOutIcpConnection, COMM_SELECT_READ, NULL, NULL, 0); ++ Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0); } void diff --cc src/ip/QosConfig.cc index 7b1bf8afa3,e827c295af..7586ec71eb --- a/src/ip/QosConfig.cc +++ b/src/ip/QosConfig.cc @@@ -60,21 -59,28 +60,21 @@@ void Ip::Qos::getNfmarkFromServer(cons * port numbers. */ - Ip::Address serv_fde_local_conn; - struct addrinfo *addr = NULL; - serv_fde_local_conn.InitAddrInfo(addr); - getsockname(server_fd, addr->ai_addr, &(addr->ai_addrlen)); - serv_fde_local_conn = *addr; - serv_fde_local_conn.GetAddrInfo(addr); - - unsigned short serv_fde_local_port = ((struct sockaddr_in*)addr->ai_addr)->sin_port; - struct in6_addr serv_fde_local_ip6; - struct in_addr serv_fde_local_ip; - - if (Ip::EnableIpv6 && serv_fde_local_conn.IsIPv6()) { - serv_fde_local_ip6 = ((struct sockaddr_in6*)addr->ai_addr)->sin6_addr; + if (Ip::EnableIpv6 && server->local.IsIPv6()) { nfct_set_attr_u8(ct, ATTR_L3PROTO, AF_INET6); struct in6_addr serv_fde_remote_ip6; - server->remote.GetAddr(&serv_fde_remote_ip6); - inet_pton(AF_INET6,servFde->ipaddr,(struct in6_addr*)&serv_fde_remote_ip6); ++ server->remote.GetInAddr(serv_fde_remote_ip6); nfct_set_attr(ct, ATTR_IPV6_DST, serv_fde_remote_ip6.s6_addr); + struct in6_addr serv_fde_local_ip6; - server->local.GetAddr(serv_fde_local_ip6); ++ server->local.GetInAddr(serv_fde_local_ip6); nfct_set_attr(ct, ATTR_IPV6_SRC, serv_fde_local_ip6.s6_addr); } else { - serv_fde_local_ip = ((struct sockaddr_in*)addr->ai_addr)->sin_addr; nfct_set_attr_u8(ct, ATTR_L3PROTO, AF_INET); - struct in6_addr serv_fde_remote_ip; - server->remote.GetAddr(&serv_fde_remote_ip); - nfct_set_attr_u32(ct, ATTR_IPV4_DST, inet_addr(servFde->ipaddr)); ++ struct in_addr serv_fde_remote_ip; ++ server->remote.GetInAddr(serv_fde_remote_ip); + nfct_set_attr_u32(ct, ATTR_IPV4_DST, serv_fde_remote_ip.s_addr); + struct in_addr serv_fde_local_ip; - server->local.GetAddr(serv_fde_local_ip); ++ server->local.GetInAddr(serv_fde_local_ip); nfct_set_attr_u32(ct, ATTR_IPV4_SRC, serv_fde_local_ip.s_addr); } @@@ -96,7 -104,7 +96,6 @@@ } else { debugs(17, 2, "QOS: Failed to open conntrack handle for upstream netfilter mark retrieval."); } -- serv_fde_local_conn.FreeAddrInfo(addr); nfct_destroy(ct); } else { diff --cc src/ipc/StartListening.cc index 0d93979211,8e146f8272..4ac9aac01f --- a/src/ipc/StartListening.cc +++ b/src/ipc/StartListening.cc @@@ -6,11 -6,8 +6,9 @@@ */ #include "config.h" - #include "base/Subscription.h" #include "base/TextException.h" #include "comm.h" - #include "comm/ConnAcceptor.h" +#include "comm/Connection.h" #include "ipc/SharedListen.h" #include "ipc/StartListening.h" @@@ -29,40 -26,28 +27,29 @@@ std::ostream &Ipc::StartListeningCb::st } void -Ipc::StartListening(int sock_type, int proto, Ip::Address &addr, int flags, +Ipc::StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn, - FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub) + FdNoteId fdNote, AsyncCall::Pointer &callback) { if (UsingSmp()) { // if SMP is on, share OpenListenerParams p; p.sock_type = sock_type; p.proto = proto; - p.addr = addr; - p.flags = flags; + p.addr = listenConn->local; + p.flags = listenConn->flags; p.fdNote = fdNote; - p.handlerSubscription = sub; - Ipc::JoinSharedListen(p, callback); return; // wait for the call back } StartListeningCb *cbd = dynamic_cast(callback->getDialer()); Must(cbd); + cbd->conn = listenConn; enter_suid(); - if (sock_type == SOCK_STREAM) { - // TCP: setup the subscriptions such that new connections accepted by listenConn are handled by HTTP - AsyncJob::Start(new Comm::ConnAcceptor(cbd->conn, FdNote(fdNote), sub)); - } else if (sock_type == SOCK_DGRAM) { - // UDP: setup the listener socket, but do not set a subscriber - Comm::ConnectionPointer udpConn = listenConn; - comm_open_listener(sock_type, proto, udpConn, FdNote(fdNote)); - } else { - fatalf("Invalid Socket Type (%d)",sock_type); - } - cbd->errNo = cbd->conn->isOpen() ? 0 : errno; - cbd->fd = comm_open_listener(sock_type, proto, addr, flags, FdNote(fdNote)); - cbd->errNo = cbd->fd >= 0 ? 0 : errno; ++ comm_open_listener(sock_type, proto, cbd->conn, FdNote(fdNote)); ++ cbd->errNo = Comm::IsConnOpen(cbd->conn) ? 0 : errno; leave_suid(); - debugs(54, 3, HERE << "opened listen FD " << cbd->fd << " on " << addr); + debugs(54, 3, HERE << "opened listen " << cbd->conn); ScheduleCallHere(callback); } diff --cc src/ipc/StartListening.h index 91d97e2478,99354133b4..0604dc95d6 --- a/src/ipc/StartListening.h +++ b/src/ipc/StartListening.h @@@ -38,9 -35,9 +38,9 @@@ public }; /// Depending on whether SMP is on, either ask Coordinator to send us - /// the listening FD or start a connection acceptor directly. -/// the listening FD or call comm_open_listener() directly. -extern void StartListening(int sock_type, int proto, Ip::Address &addr, - int flags, FdNoteId fdNote, AsyncCall::Pointer &callback); ++/// the listening FD or open a listening socket directly. +extern void StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn, - FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &handlerSub); ++ FdNoteId fdNote, AsyncCall::Pointer &callback); } // namespace Ipc; diff --cc src/ipc/UdsOp.cc index 5eb6d39ecd,878d0f5020..58f65ee5fb --- a/src/ipc/UdsOp.cc +++ b/src/ipc/UdsOp.cc @@@ -115,7 -113,7 +115,7 @@@ void Ipc::UdsSender::write( void Ipc::UdsSender::wrote(const CommIoCbParams& params) { - debugs(54, 5, HERE << params.conn << " flag " << params.flag << " [" << this << ']'); - debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " retries " << retries << " [" << this << ']'); ++ debugs(54, 5, HERE << params.conn << " flag " << params.flag << " retries " << retries << " [" << this << ']'); writing = false; if (params.flag != COMM_OK && retries-- > 0) { sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed? diff --cc src/main.cc index 89a206d7f9,662c6ef629..f2d1aac786 --- a/src/main.cc +++ b/src/main.cc @@@ -61,50 -52,28 +62,23 @@@ #include "event.h" #include "EventLoop.h" #include "ExternalACL.h" --#include "Store.h" ++#include "htcp.h" ++#include "HttpReply.h" #include "ICP.h" #include "ident/Ident.h" -#include "HttpReply.h" -#include "pconn.h" -#include "Mem.h" -#include "acl/Asn.h" -#include "acl/Acl.h" -#include "htcp.h" -#include "StoreFileSystem.h" -#include "DiskIO/DiskIOModule.h" -#include "ipc/Kids.h" +#include "ip/tools.h" #include "ipc/Coordinator.h" +#include "ipc/Kids.h" #include "ipc/Strand.h" - #include "HttpReply.h" - #include "pconn.h" - #include "Mem.h" - #include "acl/Asn.h" - #include "acl/Acl.h" - #include "htcp.h" - #include "StoreFileSystem.h" - #include "DiskIO/DiskIOModule.h" - #include "comm.h" - #if USE_EPOLL - #include "comm_epoll.h" - #endif - #if USE_KQUEUE - #include "comm_kqueue.h" - #endif - #if USE_POLL - #include "comm_poll.h" - #endif - #if defined(USE_SELECT) || defined(USE_SELECT_WIN32) - #include "comm_select.h" - #endif - #include "ConfigParser.h" - #include "CpuAffinity.h" -#include "ip/tools.h" -#include "SquidTime.h" -#include "SwapDir.h" ++ +#include "DiskIO/DiskIOModule.h" - #include "errorpage.h" +#if USE_SQUID_ESI +#include "esi/Module.h" +#endif - #include "event.h" - #include "EventLoop.h" - #include "ExternalACL.h" #include "forward.h" -#include "MemPool.h" +#include "fs/Module.h" - #include "htcp.h" - #include "HttpReply.h" #include "icmp/IcmpSquid.h" #include "icmp/net_db.h" - #if USE_LOADABLE_MODULES #include "LoadableModules.h" #endif @@@ -135,9 -96,9 +109,6 @@@ #if USE_ADAPTATION #include "adaptation/Config.h" #endif --#if USE_SQUID_ESI --#include "esi/Module.h" --#endif #include "fs/Module.h" #if HAVE_PATHS_H diff --cc src/snmp_core.cc index 138d909abf,555ee882ac..28e7f25ed0 --- a/src/snmp_core.cc +++ b/src/snmp_core.cc @@@ -33,7 -33,7 +33,8 @@@ #include "acl/FilledChecklist.h" #include "cache_snmp.h" #include "comm.h" +#include "comm/Connection.h" + #include "comm/Loops.h" #include "ipc/StartListening.h" #include "ip/Address.h" #include "ip/tools.h" @@@ -303,62 -304,98 +304,62 @@@ snmpConnectionOpen(void { debugs(49, 5, "snmpConnectionOpen: Called"); - if (Config.Port.snmp > 0) { - Config.Addrs.snmp_incoming.SetPort(Config.Port.snmp); + if (Config.Port.snmp <= 0) + return; - if (!Ip::EnableIpv6 && !Config.Addrs.snmp_incoming.SetIPv4()) { - debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << Config.Addrs.snmp_incoming << " is not an IPv4 address."); - fatal("SNMP port cannot be opened."); - } - /* split-stack for now requires IPv4-only SNMP */ - if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && Config.Addrs.snmp_incoming.IsAnyAddr()) { - Config.Addrs.snmp_incoming.SetIPv4(); - } + snmpIncomingConn = new Comm::Connection; + snmpIncomingConn->local = Config.Addrs.snmp_incoming; + snmpIncomingConn->local.SetPort(Config.Port.snmp); - AsyncCall::Pointer call = asyncCall(49, 2, - "snmpIncomingConnectionOpened", - SnmpListeningStartedDialer(&snmpIncomingConnectionOpened)); - - Ipc::StartListening(SOCK_DGRAM, - IPPROTO_UDP, - Config.Addrs.snmp_incoming, - COMM_NONBLOCKING, - Ipc::fdnInSnmpSocket, call); - - if (!Config.Addrs.snmp_outgoing.IsNoAddr()) { - Config.Addrs.snmp_outgoing.SetPort(Config.Port.snmp); - - if (!Ip::EnableIpv6 && !Config.Addrs.snmp_outgoing.SetIPv4()) { - debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << Config.Addrs.snmp_outgoing << " is not an IPv4 address."); - fatal("SNMP port cannot be opened."); - } - /* split-stack for now requires IPv4-only SNMP */ - if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && Config.Addrs.snmp_outgoing.IsAnyAddr()) { - Config.Addrs.snmp_outgoing.SetIPv4(); - } - AsyncCall::Pointer call = asyncCall(49, 2, - "snmpOutgoingConnectionOpened", - SnmpListeningStartedDialer(&snmpOutgoingConnectionOpened)); - - Ipc::StartListening(SOCK_DGRAM, - IPPROTO_UDP, - Config.Addrs.snmp_outgoing, - COMM_NONBLOCKING, - Ipc::fdnOutSnmpSocket, call); - } + if (!Ip::EnableIpv6 && !snmpIncomingConn->local.SetIPv4()) { + debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << snmpIncomingConn->local << " is not an IPv4 address."); + fatal("SNMP port cannot be opened."); + } + /* split-stack for now requires IPv4-only SNMP */ + if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && snmpIncomingConn->local.IsAnyAddr()) { + snmpIncomingConn->local.SetIPv4(); } -} - -static void -snmpIncomingConnectionOpened(int fd, int errNo) -{ - theInSnmpConnection = fd; - if (theInSnmpConnection < 0) - fatal("Cannot open Incoming SNMP Port"); - Comm::SetSelect(theInSnmpConnection, COMM_SELECT_READ, snmpHandleUdp, NULL, 0); + AsyncCall::Pointer call = asyncCall(49, 2, "snmpIncomingConnectionOpened", + SnmpListeningStartedDialer(&snmpPortOpened)); - Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpIncomingConn, Ipc::fdnInSnmpSocket, call, Subscription::Pointer()); ++ Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpIncomingConn, Ipc::fdnInSnmpSocket, call); - debugs(1, 1, "Accepting SNMP messages on " << Config.Addrs.snmp_incoming << - ", FD " << theInSnmpConnection << "."); + if (!Config.Addrs.snmp_outgoing.IsNoAddr()) { + snmpOutgoingConn = new Comm::Connection; + snmpOutgoingConn->local = Config.Addrs.snmp_outgoing; + snmpOutgoingConn->local.SetPort(Config.Port.snmp); - if (Config.Addrs.snmp_outgoing.IsNoAddr()) - theOutSnmpConnection = theInSnmpConnection; + if (!Ip::EnableIpv6 && !snmpOutgoingConn->local.SetIPv4()) { + debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << snmpOutgoingConn->local << " is not an IPv4 address."); + fatal("SNMP port cannot be opened."); + } + /* split-stack for now requires IPv4-only SNMP */ + if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && snmpOutgoingConn->local.IsAnyAddr()) { + snmpOutgoingConn->local.SetIPv4(); + } + AsyncCall::Pointer call = asyncCall(49, 2, "snmpOutgoingConnectionOpened", + SnmpListeningStartedDialer(&snmpPortOpened)); - Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpOutgoingConn, Ipc::fdnOutSnmpSocket, call, Subscription::Pointer()); ++ Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpOutgoingConn, Ipc::fdnOutSnmpSocket, call); + } else { + snmpOutgoingConn = snmpIncomingConn; + debugs(1, DBG_IMPORTANT, "Sending SNMP messages from " << snmpOutgoingConn->local); + } } static void -snmpOutgoingConnectionOpened(int fd, int errNo) +snmpPortOpened(const Comm::ConnectionPointer &conn, int errNo) { - theOutSnmpConnection = fd; - if (theOutSnmpConnection < 0) - fatal("Cannot open Outgoing SNMP Port"); - - Comm::SetSelect(theOutSnmpConnection, COMM_SELECT_READ, snmpHandleUdp, NULL, 0); - - debugs(1, 1, "Outgoing SNMP messages on " << Config.Addrs.snmp_outgoing << - ", FD " << theOutSnmpConnection << "."); - - { - struct addrinfo *xaddr = NULL; - int x; - - - theOutSNMPAddr.SetEmpty(); - - theOutSNMPAddr.InitAddrInfo(xaddr); - - x = getsockname(theOutSnmpConnection, xaddr->ai_addr, &xaddr->ai_addrlen); + if (!Comm::IsConnOpen(conn)) + fatalf("Cannot open SNMP %s Port",(conn->fd == snmpIncomingConn->fd?"receiving":"sending")); - commSetSelect(conn->fd, COMM_SELECT_READ, snmpHandleUdp, NULL, 0); - if (x < 0) - debugs(51, 1, "theOutSnmpConnection FD " << theOutSnmpConnection << ": getsockname: " << xstrerror()); - else - theOutSNMPAddr = *xaddr; ++ Comm::SetSelect(conn->fd, COMM_SELECT_READ, snmpHandleUdp, NULL, 0); - theOutSNMPAddr.FreeAddrInfo(xaddr); - } + if (conn->fd == snmpIncomingConn->fd) + debugs(1, DBG_IMPORTANT, "Accepting SNMP messages on " << snmpIncomingConn->local); + else if (conn->fd == snmpOutgoingConn->fd) + debugs(1, DBG_IMPORTANT, "Sending SNMP messages from " << snmpOutgoingConn->local); + else + fatalf("Lost SNMP port (%d) on FD %d", (int)conn->local.GetPort(), conn->fd); } void @@@ -377,9 -423,9 +378,9 @@@ snmpConnectionShutdown(void * specific interface. During shutdown, we must disable reading * on the outgoing socket. */ - assert(theOutSnmpConnection > -1); + assert(Comm::IsConnOpen(snmpOutgoingConn)); - commSetSelect(snmpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0); - Comm::SetSelect(theOutSnmpConnection, COMM_SELECT_READ, NULL, NULL, 0); ++ Comm::SetSelect(snmpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0); } void diff --cc src/stat.cc index bf178566d9,9ad3d0913d..83c3c69186 --- a/src/stat.cc +++ b/src/stat.cc @@@ -36,9 -36,10 +36,10 @@@ #include "event.h" #include "StoreClient.h" #include "auth/UserRequest.h" -#include "mgr/Registration.h" +#include "comm/Connection.h" #include "Store.h" #include "HttpRequest.h" + #include "log/Tokens.h" #include "MemObject.h" #include "fde.h" #include "mem_node.h" diff --cc src/wccp.cc index 86a8fe49dc,fdad1a0a82..414d9b9a46 --- a/src/wccp.cc +++ b/src/wccp.cc @@@ -36,8 -36,9 +36,11 @@@ #if USE_WCCP + #include "squid.h" ++ #include "comm.h" +#include "comm/Connection.h" + #include "comm/Loops.h" #include "event.h" #define WCCP_PORT 2048 diff --cc src/wccp2.cc index 185143c5f3,6e8cb568fd..2b0556be30 --- a/src/wccp2.cc +++ b/src/wccp2.cc @@@ -35,7 -35,7 +35,8 @@@ #if USE_WCCPv2 #include "comm.h" +#include "comm/Connection.h" + #include "comm/Loops.h" #include "compat/strsep.h" #include "event.h" #include "ip/Address.h"