icapRequest(NULL),
icapReply(NULL),
attempts(0),
- connection(NULL),
theService(aService),
commEof(false),
reuseConnection(true),
isRetriable(true),
isRepeatable(true),
ignoreLastWrite(false),
- stopReason(NULL),
- reader(NULL),
- writer(NULL),
- closer(NULL),
alep(new AccessLogEntry),
al(*alep)
{
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
- bool wasReused = false;
- connection = s.getConnection(isRetriable, wasReused);
-
- if (wasReused && Comm::IsConnOpen(connection)) {
- successfullyConnected();
+ if (const auto pconn = s.getIdleConnection(isRetriable)) {
+ useTransportConnection(pconn);
return;
}
return;
}
- // XXX: Do not save this half-baked connection. Just pass it to ConnOpener.
- connection = new Comm::Connection;
- connection->remote = ia->current();
- connection->remote.port(s.cfg().port);
- getOutgoingAddress(NULL, connection);
+ const Comm::ConnectionPointer conn = new Comm::Connection();
+ conn->remote = ia->current();
+ conn->remote.port(s.cfg().port);
+ getOutgoingAddress(nullptr, conn);
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
- const auto cs = new Comm::ConnOpener(connection, callback, TheConfig.connect_timeout(service().cfg().bypass));
+ const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
cs->setHost(s.cfg().host.termedBuf());
connWait.start(cs, callback);
AsyncJob::Start(cs);
closer = NULL;
}
+ commUnsetConnTimeout(connection);
+
cancelRead(); // may not work
if (reuseConnection && !doneWithIo()) {
{
connWait.finish();
- if (io.flag == Comm::TIMEOUT) {
- handleCommTimedout();
- return;
- }
-
if (io.flag != Comm::OK) {
dieOnConnectionFailure(); // throws
return;
}
- // Finalize the details and start owning the supplied connection, possibly
- // prematurely -- see XXX in successfullyConnected().
- assert(io.conn);
- assert(connection);
- assert(!connection->isOpen());
- connection = io.conn;
- successfullyConnected();
+ useTransportConnection(io.conn);
}
-/// called when successfully connected to an ICAP service
+/// React to the availability of a transport connection to the ICAP service.
+/// The given connection may (or may not) be secured already.
void
-Adaptation::Icap::Xaction::successfullyConnected()
+Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn)
{
- assert(Comm::IsConnOpen(connection));
-
- // XXX: We should not set timeout and closure handlers here if we are going
- // to negotiate TLS. Only Ssl::IcapPeerConnector should own the connection.
-
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
- commSetConnTimeout(connection, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
-
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
- comm_add_close_handler(connection->fd, closer);
+ assert(Comm::IsConnOpen(conn));
+ assert(!connection);
// If it is a reused connection and the TLS object is built
// we should not negotiate new TLS session
- const auto &ssl = fd_table[connection->fd].ssl;
+ const auto &ssl = fd_table[conn->fd].ssl;
if (!ssl && service().cfg().secure.encryptTransport) {
+ // XXX: Exceptions orphan conn.
CbcPointer<Adaptation::Icap::Xaction> me(this);
AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
- const auto sslConnector = new Ssl::IcapPeerConnector(theService, connection, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
+ const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
encryptionWait.start(sslConnector, callback);
AsyncJob::Start(sslConnector); // will call our callback
return;
}
-// ?? fd_table[io.conn->fd].noteUse(icapPconnPool);
+ useIcapConnection(conn);
+}
+
+/// react to the availability of a fully-ready ICAP connection
+void
+Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn)
+{
+ assert(!connection);
+ assert(conn);
+ assert(Comm::IsConnOpen(conn));
+ connection = conn;
service().noteConnectionUse(connection);
- handleCommConnected();
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
+ AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
+ TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+ commSetConnTimeout(connection, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
+
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ comm_add_close_handler(connection->fd, closer);
+
+ startShoveling();
}
void Adaptation::Icap::Xaction::dieOnConnectionFailure()
// communication timeout with the ICAP service
void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
-{
- handleCommTimedout();
-}
-
-void Adaptation::Icap::Xaction::handleCommTimedout()
{
debugs(93, 2, HERE << typeName << " failed: timeout with " <<
theService->cfg().methodStr() << " " <<
theService->cfg().uri << status());
reuseConnection = false;
- const auto whileConnecting = bool(connWait);
- if (whileConnecting) {
- assert(!haveConnection());
- theService->noteConnectionFailed("timedout");
- } else
- closeConnection(); // so that late Comm callbacks do not disturb bypass
- throw TexcHere(whileConnecting ?
- "timed out while connecting to the ICAP service" :
- "timed out while talking to the ICAP service");
+ assert(haveConnection());
+ theService->noteConnectionFailed("timedout");
+ closeConnection();
+ throw TextException("timed out while talking to the ICAP service", Here());
}
// unexpected connection close while talking to the ICAP service
void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
{
closer = NULL;
- handleCommClosed();
-}
-
-void Adaptation::Icap::Xaction::handleCommClosed()
-{
detailError(ERR_DETAIL_ICAP_XACT_CLOSE);
mustStop("ICAP service connection externally closed");
}
void Adaptation::Icap::Xaction::swanSong()
{
// kids should sing first and then call the parent method.
- if (connWait) {
+ if (connWait || encryptionWait) {
service().noteConnectionFailed("abort");
}
{
encryptionWait.finish();
- if (closer != NULL) {
- if (Comm::IsConnOpen(answer.conn))
- comm_remove_close_handler(answer.conn->fd, closer);
- else
- closer->cancel("securing completed");
- closer = NULL;
- }
-
if (answer.error.get()) {
+ // XXX: Security::PeerConnector should do that for negative answers instead.
if (answer.conn != NULL)
answer.conn->close();
+ // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
debugs(93, 2, typeName <<
" TLS negotiation to " << service().cfg().uri << " failed");
service().noteConnectionFailed("failure");
debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
- service().noteConnectionUse(answer.conn);
-
- handleCommConnected();
+ // XXX: answer.conn could be closing here. Missing a syncWithComm equivalent?
+ useIcapConnection(answer.conn);
}
virtual void start();
virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
+ /// starts sending/receiving ICAP messages
+ virtual void startShoveling() = 0;
+
// comm hanndlers; called by comm handler wrappers
- virtual void handleCommConnected() = 0;
virtual void handleCommWrote(size_t sz) = 0;
virtual void handleCommRead(size_t sz) = 0;
- virtual void handleCommTimedout();
- virtual void handleCommClosed();
void handleSecuredPeer(Security::EncryptorAnswer &answer);
/// record error detail if possible
void openConnection();
void closeConnection();
- void dieOnConnectionFailure();
bool haveConnection() const;
void scheduleRead();
ServiceRep &service();
private:
- void successfullyConnected();
+ void useTransportConnection(const Comm::ConnectionPointer &);
+ void useIcapConnection(const Comm::ConnectionPointer &);
+ void dieOnConnectionFailure();
void tellQueryAborted();
void maybeLog();
protected:
- Comm::ConnectionPointer connection; ///< ICAP server connection
Adaptation::Icap::ServiceRep::Pointer theService;
SBuf readBuf;
bool isRepeatable; ///< can repeat if no or unsatisfactory response
bool ignoreLastWrite;
- const char *stopReason;
-
AsyncCall::Pointer reader;
AsyncCall::Pointer writer;
- AsyncCall::Pointer closer;
AccessLogEntry::Pointer alep; ///< icap.log entry
AccessLogEntry &al; ///< short for *alep
/// encrypts an established transport connection
JobWait<Ssl::IcapPeerConnector> encryptionWait;
+
+ /// open and, if necessary, secured connection to the ICAP server (or nil)
+ Comm::ConnectionPointer connection;
+
+ AsyncCall::Pointer closer;
};
} // namespace Icap