#include "squid.h"
#include "comm.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
#include "CommCalls.h"
#include "HttpMsg.h"
#include "adaptation/icap/Xaction.h"
#include "pconn.h"
#include "HttpRequest.h"
#include "HttpReply.h"
-#include "ip/tools.h"
#include "acl/FilledChecklist.h"
#include "icap_log.h"
#include "fde.h"
//CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, Xaction);
-Adaptation::Icap::Xaction::Xaction(const char *aTypeName,
- Adaptation::Icap::ServiceRep::Pointer &aService):
+Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::ServiceRep::Pointer &aService):
AsyncJob(aTypeName),
Adaptation::Initiate(aTypeName),
icapRequest(NULL),
icapReply(NULL),
attempts(0),
- connection(-1),
+ connection(NULL),
theService(aService),
commBuf(NULL), commBufSize(0),
commEof(false),
}
// TODO: obey service-specific, OPTIONS-reported connection limit
-void Adaptation::Icap::Xaction::openConnection()
+void
+Adaptation::Icap::Xaction::openConnection()
{
- Ip::Address client_addr;
-
- Must(connection < 0);
+ Must(!haveConnection());
const Adaptation::Service &s = service();
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
+ connection = new Comm::Connection;
+
+ /* NP: set these here because it applies whether a pconn or a new conn is used */
+
+ // TODO: Avoid blocking lookup if s.cfg().host is a hostname
+ connection->remote = s.cfg().host.termedBuf();
+ connection->remote.SetPort(s.cfg().port);
+
// TODO: check whether NULL domain is appropriate here
- connection = icapPconnPool->pop(s.cfg().host.termedBuf(), s.cfg().port, NULL, client_addr, isRetriable);
- if (connection >= 0) {
- debugs(93,3, HERE << "reused pconn FD " << connection);
+ icapPconnPool->pop(connection, NULL, isRetriable);
+ if (connection->isOpen()) {
+ debugs(93,3, HERE << "reused pconn " << connection);
// fake the connect callback
// TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
CbcPointer<Xaction> self(this);
Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
- dialer.params.fd = connection;
+ dialer.params.conn = connection;
dialer.params.flag = COMM_OK;
// fake other parameters by copying from the existing connection
connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
disableRetries(); // we only retry pconn failures
- Ip::Address outgoing;
- if (!Ip::EnableIpv6 && !outgoing.SetIPv4()) {
- debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoing << " is not an IPv4 address.");
- dieOnConnectionFailure(); // throws
- }
- /* split-stack for now requires default IPv4-only socket */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoing.IsAnyAddr() && !s.cfg().ipv6) {
- outgoing.SetIPv4();
- }
-
- connection = comm_open(SOCK_STREAM, 0, outgoing,
- COMM_NONBLOCKING, s.cfg().uri.termedBuf());
-
- if (connection < 0)
- dieOnConnectionFailure(); // throws
-
- debugs(93,3, typeName << " opens connection to " << s.cfg().host << ":" << s.cfg().port);
-
- // TODO: service bypass status may differ from that of a transaction
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(93, 5,
- TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
- commSetTimeout(connection, TheConfig.connect_timeout(
- service().cfg().bypass), timeoutCall);
-
- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
- closer = JobCallback(93, 5,
- CloseDialer, this, Adaptation::Icap::Xaction::noteCommClosed);
- comm_add_close_handler(connection, closer);
-
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
- connector = JobCallback(93,3,
- ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
- commConnectStart(connection, s.cfg().host.termedBuf(), s.cfg().port, connector);
+ connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+ Comm::ConnOpener *cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
+ cs->setHost(s.cfg().host.termedBuf());
+ AsyncJob::Start(cs);
}
/*
void Adaptation::Icap::Xaction::closeConnection()
{
- if (connection >= 0) {
+ if (haveConnection()) {
if (closer != NULL) {
- comm_remove_close_handler(connection, closer);
+ comm_remove_close_handler(connection->fd, closer);
closer = NULL;
}
}
if (reuseConnection) {
- Ip::Address client_addr;
//status() adds leading spaces.
debugs(93,3, HERE << "pushing pconn" << status());
- AsyncCall::Pointer call = NULL;
- commSetTimeout(connection, -1, call);
- icapPconnPool->push(connection, theService->cfg().host.termedBuf(),
- theService->cfg().port, NULL, client_addr);
+ AsyncCall::Pointer nul;
+ commSetTimeout(connection->fd, -1, nul);
+ icapPconnPool->push(connection, NULL);
disableRetries();
} else {
//status() adds leading spaces.
debugs(93,3, HERE << "closing pconn" << status());
- // comm_close will clear timeout
- comm_close(connection);
+ connection->close();
}
writer = NULL;
reader = NULL;
connector = NULL;
- connection = -1;
+ connection = NULL;
}
}
// connection with the ICAP service established
void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
{
+ if (io.flag == COMM_TIMEOUT) {
+ handleCommTimedout();
+ return;
+ }
+
Must(connector != NULL);
connector = NULL;
if (io.flag != COMM_OK)
dieOnConnectionFailure(); // throws
- fd_table[connection].noteUse(icapPconnPool);
+ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+ comm_add_close_handler(io.conn->fd, closer);
+
+ fd_table[io.conn->fd].noteUse(icapPconnPool);
handleCommConnected();
}
void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
{
+ Must(haveConnection());
+
// comm module will free the buffer
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
- writer = JobCallback(93,3,
- Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
-
+ writer = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
comm_write_mbuf(connection, &buf, writer);
updateTimeout();
}
void Adaptation::Icap::Xaction::updateTimeout()
{
+ Must(haveConnection());
+
if (reader != NULL || writer != NULL) {
// restart the timeout before each I/O
// XXX: why does Config.Timeout lacks a write timeout?
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer call = JobCallback(93,5,
- TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
-
- commSetTimeout(connection,
- TheConfig.io_timeout(service().cfg().bypass), call);
+ AsyncCall::Pointer call = JobCallback(93, 5, TimeoutDialer, this, Adaptation::Icap::Xaction::noteCommTimedout);
+ commSetTimeout(connection->fd, TheConfig.io_timeout(service().cfg().bypass), call);
} else {
// clear timeout when there is no I/O
// Do we need a lifetime timeout?
AsyncCall::Pointer call = NULL;
- commSetTimeout(connection, -1, call);
+ commSetTimeout(connection->fd, -1, call);
}
}
void Adaptation::Icap::Xaction::scheduleRead()
{
- Must(connection >= 0);
+ Must(haveConnection());
Must(!reader);
Must(readBuf.hasSpace());
* here instead of reading directly into readBuf.buf.
*/
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
- reader = JobCallback(93,3,
- Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
-
+ reader = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
comm_read(connection, commBuf, readBuf.spaceSize(), reader);
updateTimeout();
}
void Adaptation::Icap::Xaction::cancelRead()
{
if (reader != NULL) {
- comm_read_cancel(connection, reader);
+ Must(haveConnection());
+ comm_read_cancel(connection->fd, reader);
reader = NULL;
}
}
bool Adaptation::Icap::Xaction::doneWithIo() const
{
- return connection >= 0 && // or we could still be waiting to open it
+ return haveConnection() &&
!connector && !reader && !writer && // fast checks, some redundant
doneReading() && doneWriting();
}
+bool Adaptation::Icap::Xaction::haveConnection() const
+{
+ return connection != NULL && connection->isOpen();
+}
+
// initiator aborted
void Adaptation::Icap::Xaction::noteInitiatorAborted()
{
void Adaptation::Icap::Xaction::fillPendingStatus(MemBuf &buf) const
{
- if (connection >= 0) {
- buf.Printf("FD %d", connection);
+ if (haveConnection()) {
+ buf.Printf("FD %d", connection->fd);
if (writer != NULL)
buf.append("w", 1);
void Adaptation::Icap::Xaction::fillDoneStatus(MemBuf &buf) const
{
- if (connection >= 0 && commEof)
- buf.Printf("Comm(%d)", connection);
+ if (haveConnection() && commEof)
+ buf.Printf("Comm(%d)", connection->fd);
if (stopReason != NULL)
buf.Printf("Stopped");