From 839a0bae62e12f543bb1d7d371643547f9545adc Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Fri, 6 Aug 2021 11:44:37 -0400 Subject: [PATCH] Refactored ICAP connection-establishing code ... to delay Connection ownership until the ICAP connection is ready. This change addresses primary Connection ownership concerns (as they apply to this ICAP code) except orphaning of the temporary Connection object by helper job start exceptions (now an explicit XXX). For example, the transaction no longer shares a Connection object with ConnOpener and IcapPeerConnector jobs. Also removed (unused except for debugging) Icap::Xaction::stopReason that was shadowing AsyncJob::stopReason, effectively breaking debugging. --- src/adaptation/icap/ModXact.cc | 3 +- src/adaptation/icap/ModXact.h | 5 +- src/adaptation/icap/OptXact.cc | 2 +- src/adaptation/icap/OptXact.h | 3 +- src/adaptation/icap/ServiceRep.cc | 5 +- src/adaptation/icap/ServiceRep.h | 3 +- src/adaptation/icap/Xaction.cc | 128 ++++++++++++------------------ src/adaptation/icap/Xaction.h | 20 ++--- 8 files changed, 70 insertions(+), 99 deletions(-) diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc index a73f32c547..ed01289c5f 100644 --- a/src/adaptation/icap/ModXact.cc +++ b/src/adaptation/icap/ModXact.cc @@ -186,8 +186,7 @@ void Adaptation::Icap::ModXact::startWriting() openConnection(); } -// connection with the ICAP service established -void Adaptation::Icap::ModXact::handleCommConnected() +void Adaptation::Icap::ModXact::startShoveling() { Must(state.writing == State::writingConnect); diff --git a/src/adaptation/icap/ModXact.h b/src/adaptation/icap/ModXact.h index d6c541bdb3..06ceb251a4 100644 --- a/src/adaptation/icap/ModXact.h +++ b/src/adaptation/icap/ModXact.h @@ -155,10 +155,11 @@ public: virtual void noteBodyProductionEnded(BodyPipe::Pointer); virtual void noteBodyProducerAborted(BodyPipe::Pointer); - // comm handlers - virtual void handleCommConnected(); + /* Xaction API */ + virtual void startShoveling(); virtual void handleCommWrote(size_t size); virtual void handleCommRead(size_t size); + void handleCommWroteHeaders(); void handleCommWroteBody(); diff --git a/src/adaptation/icap/OptXact.cc b/src/adaptation/icap/OptXact.cc index d463faf316..24510d2113 100644 --- a/src/adaptation/icap/OptXact.cc +++ b/src/adaptation/icap/OptXact.cc @@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start() openConnection(); } -void Adaptation::Icap::OptXact::handleCommConnected() +void Adaptation::Icap::OptXact::startShoveling() { scheduleRead(); diff --git a/src/adaptation/icap/OptXact.h b/src/adaptation/icap/OptXact.h index 6acc531257..ea2fc374bc 100644 --- a/src/adaptation/icap/OptXact.h +++ b/src/adaptation/icap/OptXact.h @@ -30,8 +30,9 @@ public: OptXact(ServiceRep::Pointer &aService); protected: + /* Xaction API */ virtual void start(); - virtual void handleCommConnected(); + virtual void startShoveling(); virtual void handleCommWrote(size_t size); virtual void handleCommRead(size_t size); diff --git a/src/adaptation/icap/ServiceRep.cc b/src/adaptation/icap/ServiceRep.cc index 0a6cc18242..6d46d08f02 100644 --- a/src/adaptation/icap/ServiceRep.cc +++ b/src/adaptation/icap/ServiceRep.cc @@ -112,9 +112,8 @@ void Adaptation::Icap::ServiceRep::noteFailure() // should be configurable. } -// returns a persistent or brand new connection; negative int on failures Comm::ConnectionPointer -Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) +Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact) { Comm::ConnectionPointer connection; @@ -137,7 +136,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) else theIdleConns->closeN(1); - reused = Comm::IsConnOpen(connection); ++theBusyConns; debugs(93,3, HERE << "got connection: " << connection); return connection; @@ -150,7 +148,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer & // do not pool an idle connection if we owe connections if (isReusable && excessConnections() == 0) { debugs(93, 3, HERE << "pushing pconn" << comment); - commUnsetConnTimeout(conn); theIdleConns->push(conn); } else { debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " << diff --git a/src/adaptation/icap/ServiceRep.h b/src/adaptation/icap/ServiceRep.h index fd1e12096e..dde6fd6fac 100644 --- a/src/adaptation/icap/ServiceRep.h +++ b/src/adaptation/icap/ServiceRep.h @@ -85,7 +85,8 @@ public: bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const; bool allows204() const; bool allows206() const; - Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused); + /// \returns an idle persistent ICAP connection or nil + Comm::ConnectionPointer getIdleConnection(bool isRetriable); void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment); void noteConnectionUse(const Comm::ConnectionPointer &conn); void noteConnectionFailed(const char *comment); diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc index fec4b12c19..3a6bc94df8 100644 --- a/src/adaptation/icap/Xaction.cc +++ b/src/adaptation/icap/Xaction.cc @@ -80,17 +80,12 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv icapRequest(NULL), icapReply(NULL), attempts(0), - connection(NULL), theService(aService), commEof(false), reuseConnection(true), isRetriable(true), isRepeatable(true), ignoreLastWrite(false), - stopReason(NULL), - reader(NULL), - writer(NULL), - closer(NULL), alep(new AccessLogEntry), al(*alep) { @@ -164,11 +159,8 @@ Adaptation::Icap::Xaction::openConnection() if (!TheConfig.reuse_connections) disableRetries(); // this will also safely drain pconn pool - bool wasReused = false; - connection = s.getConnection(isRetriable, wasReused); - - if (wasReused && Comm::IsConnOpen(connection)) { - successfullyConnected(); + if (const auto pconn = s.getIdleConnection(isRetriable)) { + useTransportConnection(pconn); return; } @@ -197,16 +189,15 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia) return; } - // XXX: Do not save this half-baked connection. Just pass it to ConnOpener. - connection = new Comm::Connection; - connection->remote = ia->current(); - connection->remote.port(s.cfg().port); - getOutgoingAddress(NULL, connection); + const Comm::ConnectionPointer conn = new Comm::Connection(); + conn->remote = ia->current(); + conn->remote.port(s.cfg().port); + getOutgoingAddress(nullptr, conn); // TODO: service bypass status may differ from that of a transaction typedef CommCbMemFunT ConnectDialer; AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); - const auto cs = new Comm::ConnOpener(connection, callback, TheConfig.connect_timeout(service().cfg().bypass)); + const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass)); cs->setHost(s.cfg().host.termedBuf()); connWait.start(cs, callback); AsyncJob::Start(cs); @@ -235,6 +226,8 @@ void Adaptation::Icap::Xaction::closeConnection() closer = NULL; } + commUnsetConnTimeout(connection); + cancelRead(); // may not work if (reuseConnection && !doneWithIo()) { @@ -263,63 +256,62 @@ void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io) { connWait.finish(); - if (io.flag == Comm::TIMEOUT) { - handleCommTimedout(); - return; - } - if (io.flag != Comm::OK) { dieOnConnectionFailure(); // throws return; } - // Finalize the details and start owning the supplied connection, possibly - // prematurely -- see XXX in successfullyConnected(). - assert(io.conn); - assert(connection); - assert(!connection->isOpen()); - connection = io.conn; - successfullyConnected(); + useTransportConnection(io.conn); } -/// called when successfully connected to an ICAP service +/// React to the availability of a transport connection to the ICAP service. +/// The given connection may (or may not) be secured already. void -Adaptation::Icap::Xaction::successfullyConnected() +Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn) { - assert(Comm::IsConnOpen(connection)); - - // XXX: We should not set timeout and closure handlers here if we are going - // to negotiate TLS. Only Ssl::IcapPeerConnector should own the connection. - - typedef CommCbMemFunT TimeoutDialer; - AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", - TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); - commSetConnTimeout(connection, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); - - typedef CommCbMemFunT CloseDialer; - closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", - CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); - comm_add_close_handler(connection->fd, closer); + assert(Comm::IsConnOpen(conn)); + assert(!connection); // If it is a reused connection and the TLS object is built // we should not negotiate new TLS session - const auto &ssl = fd_table[connection->fd].ssl; + const auto &ssl = fd_table[conn->fd].ssl; if (!ssl && service().cfg().secure.encryptTransport) { + // XXX: Exceptions orphan conn. CbcPointer me(this); AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); - const auto sslConnector = new Ssl::IcapPeerConnector(theService, connection, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); + const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); encryptionWait.start(sslConnector, callback); AsyncJob::Start(sslConnector); // will call our callback return; } -// ?? fd_table[io.conn->fd].noteUse(icapPconnPool); + useIcapConnection(conn); +} + +/// react to the availability of a fully-ready ICAP connection +void +Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn) +{ + assert(!connection); + assert(conn); + assert(Comm::IsConnOpen(conn)); + connection = conn; service().noteConnectionUse(connection); - handleCommConnected(); + typedef CommCbMemFunT TimeoutDialer; + AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", + TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); + commSetConnTimeout(connection, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); + + typedef CommCbMemFunT CloseDialer; + closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", + CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); + comm_add_close_handler(connection->fd, closer); + + startShoveling(); } void Adaptation::Icap::Xaction::dieOnConnectionFailure() @@ -363,36 +355,21 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io) // communication timeout with the ICAP service void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &) -{ - handleCommTimedout(); -} - -void Adaptation::Icap::Xaction::handleCommTimedout() { debugs(93, 2, HERE << typeName << " failed: timeout with " << theService->cfg().methodStr() << " " << theService->cfg().uri << status()); reuseConnection = false; - const auto whileConnecting = bool(connWait); - if (whileConnecting) { - assert(!haveConnection()); - theService->noteConnectionFailed("timedout"); - } else - closeConnection(); // so that late Comm callbacks do not disturb bypass - throw TexcHere(whileConnecting ? - "timed out while connecting to the ICAP service" : - "timed out while talking to the ICAP service"); + assert(haveConnection()); + theService->noteConnectionFailed("timedout"); + closeConnection(); + throw TextException("timed out while talking to the ICAP service", Here()); } // unexpected connection close while talking to the ICAP service void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &) { closer = NULL; - handleCommClosed(); -} - -void Adaptation::Icap::Xaction::handleCommClosed() -{ detailError(ERR_DETAIL_ICAP_XACT_CLOSE); mustStop("ICAP service connection externally closed"); } @@ -597,7 +574,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome & void Adaptation::Icap::Xaction::swanSong() { // kids should sing first and then call the parent method. - if (connWait) { + if (connWait || encryptionWait) { service().noteConnectionFailed("abort"); } @@ -735,17 +712,11 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) { encryptionWait.finish(); - if (closer != NULL) { - if (Comm::IsConnOpen(answer.conn)) - comm_remove_close_handler(answer.conn->fd, closer); - else - closer->cancel("securing completed"); - closer = NULL; - } - if (answer.error.get()) { + // XXX: Security::PeerConnector should do that for negative answers instead. if (answer.conn != NULL) answer.conn->close(); + // TODO: Refactor dieOnConnectionFailure() to be usable here as well. debugs(93, 2, typeName << " TLS negotiation to " << service().cfg().uri << " failed"); service().noteConnectionFailed("failure"); @@ -755,8 +726,7 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete"); - service().noteConnectionUse(answer.conn); - - handleCommConnected(); + // XXX: answer.conn could be closing here. Missing a syncWithComm equivalent? + useIcapConnection(answer.conn); } diff --git a/src/adaptation/icap/Xaction.h b/src/adaptation/icap/Xaction.h index 88181c2855..3e340349f7 100644 --- a/src/adaptation/icap/Xaction.h +++ b/src/adaptation/icap/Xaction.h @@ -69,12 +69,12 @@ protected: virtual void start(); virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate + /// starts sending/receiving ICAP messages + virtual void startShoveling() = 0; + // comm hanndlers; called by comm handler wrappers - virtual void handleCommConnected() = 0; virtual void handleCommWrote(size_t sz) = 0; virtual void handleCommRead(size_t sz) = 0; - virtual void handleCommTimedout(); - virtual void handleCommClosed(); void handleSecuredPeer(Security::EncryptorAnswer &answer); /// record error detail if possible @@ -82,7 +82,6 @@ protected: void openConnection(); void closeConnection(); - void dieOnConnectionFailure(); bool haveConnection() const; void scheduleRead(); @@ -128,12 +127,13 @@ public: ServiceRep &service(); private: - void successfullyConnected(); + void useTransportConnection(const Comm::ConnectionPointer &); + void useIcapConnection(const Comm::ConnectionPointer &); + void dieOnConnectionFailure(); void tellQueryAborted(); void maybeLog(); protected: - Comm::ConnectionPointer connection; ///< ICAP server connection Adaptation::Icap::ServiceRep::Pointer theService; SBuf readBuf; @@ -143,11 +143,8 @@ protected: bool isRepeatable; ///< can repeat if no or unsatisfactory response bool ignoreLastWrite; - const char *stopReason; - AsyncCall::Pointer reader; AsyncCall::Pointer writer; - AsyncCall::Pointer closer; AccessLogEntry::Pointer alep; ///< icap.log entry AccessLogEntry &al; ///< short for *alep @@ -162,6 +159,11 @@ private: /// encrypts an established transport connection JobWait encryptionWait; + + /// open and, if necessary, secured connection to the ICAP server (or nil) + Comm::ConnectionPointer connection; + + AsyncCall::Pointer closer; }; } // namespace Icap -- 2.47.2