<tag>ignore_expect_100</tag>
<p>Obsolete.
+ <tag>log_fqdn</tag>
+ <p>Obsolete. Replaced by automatic detection of the %>A logformat tag.
+
+ <tag>maximum_single_addr_tries</tag>
+ <p>The behaviour controlled by this directive is no longer possible.
+ It has been replaced by <em>connect_retries</em> option which operates a little differently.
+
<tag>url_rewrite_concurrency</tag>
<p>Replaced by url_rewrite_children ... concurrency=N option.
-/*
- * $Id$
- */
-
#include "squid.h"
#include "ProtoPort.h"
+ #if HAVE_LIMITS
+ #include <limits>
+ #endif
http_port_list::http_port_list(const char *aProtocol)
#if USE_SSL
#ifndef SQUID_PROTO_PORT_H
#define SQUID_PROTO_PORT_H
-//#include "typedefs.h"
#include "cbdata.h"
-#include "comm/ListenStateData.h"
+#include "comm/Connection.h"
+ #if USE_SSL
+ #include "ssl/gadgets.h"
+ #endif
+
struct http_port_list {
http_port_list(const char *aProtocol);
~http_port_list();
#include "squid.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
+#include "comm/forward.h"
+ #include "comm/Write.h"
#include "Server.h"
#include "Store.h"
-#include "fde.h" /* for fd_table[fd].closing */
+//#include "fde.h" /* for fd_table[fd].closing */
#include "HttpRequest.h"
#include "HttpReply.h"
#include "errorpage.h"
if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
- requestSender = JobCallback(93,3,
- Dialer, this, ServerStateData::sentRequestBody);
- Comm::Write(fd, &buf, requestSender);
+ requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody);
- comm_write_mbuf(conn, &buf, requestSender);
++ Comm::Write(conn, &buf, requestSender);
} else {
debugs(9,3, HERE << "will wait for more request body bytes or eof");
requestSender = NULL;
#include "squid.h"
#include "comm.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
+ #include "comm/Write.h"
#include "CommCalls.h"
#include "HttpMsg.h"
#include "adaptation/icap/Xaction.h"
void Adaptation::Icap::Xaction::scheduleWrite(MemBuf &buf)
{
+ Must(haveConnection());
+
// comm module will free the buffer
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
- writer = JobCallback(93,3,
- Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
-
+ writer = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
- comm_write_mbuf(connection, &buf, writer);
+ Comm::Write(connection, &buf, writer);
updateTimeout();
}
#include "ClientRequestContext.h"
#include "clientStream.h"
#include "comm.h"
-#include "comm/ListenStateData.h"
-#include "base/TextException.h"
-#include "ConnectionDetail.h"
+#include "comm/Connection.h"
+#include "comm/ConnAcceptor.h"
+ #include "comm/Write.h"
#include "eui/Config.h"
#include "fde.h"
#include "HttpHdrContRange.h"
AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg",
CommIoCbPtrFun(&WroteControlMsg, this));
- comm_write_mbuf(clientConn(), mb, call);
- Comm::Write(fd(), mb, call);
++ Comm::Write(clientConn(), mb, call);
delete mb;
}
noteSentBodyBytes (length);
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
CommIoCbPtrFun(clientWriteBodyComplete, this));
- comm_write(clientConn(), bodyData.data, length, call );
- Comm::Write(fd(), bodyData.data, length, call, NULL);
++ Comm::Write(clientConn(), bodyData.data, length, call, NULL);
return;
}
/* write */
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
CommIoCbPtrFun(clientWriteComplete, this));
- comm_write_mbuf(clientConn(), &mb, call);
- Comm::Write(fd(), &mb, call);
++ Comm::Write(clientConn(), &mb, call);
} else
- writeComplete(fd(), NULL, 0, COMM_OK);
+ writeComplete(clientConn(), NULL, 0, COMM_OK);
}
/**
debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
CommIoCbPtrFun(clientWriteComplete, this));
- comm_write_mbuf(clientConn(), mb, call);
- Comm::Write(fd(), mb, call);
--
++ Comm::Write(clientConn(), mb, call);
delete mb;
}
/** handle a new HTTPS connection */
static void
-httpsAccept(int sock, int newfd, ConnectionDetail *details,
- comm_err_t flag, int xerrno, void *data)
+httpsAccept(int sock, const Comm::ConnectionPointer& details, comm_err_t flag, int xerrno, void *data)
{
https_port_list *s = (https_port_list *)data;
- SSL_CTX *sslContext = s->sslContext;
+ SSL_CTX *sslContext = s->staticSslContext.get();
- if (flag != COMM_OK) {
- errno = xerrno;
- debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
- return;
- }
+ assert(flag != COMM_OK); // Acceptor does not call un unless successful.
SSL *ssl = NULL;
- if (!(ssl = httpsCreate(newfd, details, sslContext)))
+ if (!(ssl = httpsCreate(details, sslContext)))
return;
- debugs(33, 5, "httpsAccept: FD " << newfd << " accepted, starting SSL negotiation.");
- fd_note(newfd, "client https connect");
- ConnStateData *connState = connStateCreate(details->peer, details->me,
- newfd, &s->http);
+ debugs(33, 5, HERE << details << " accepted, starting SSL negotiation.");
+ fd_note(details->fd, "client https connect");
+ ConnStateData *connState = connStateCreate(details, &s->http);
typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
- AsyncCall::Pointer call = JobCallback(33, 5,
- Dialer, connState, ConnStateData::connStateClosed);
- comm_add_close_handler(newfd, call);
+ AsyncCall::Pointer call = JobCallback(33, 5, Dialer, connState, ConnStateData::connStateClosed);
+ comm_add_close_handler(details->fd, call);
if (Config.onoff.log_fqdn)
- fqdncache_gethostbyaddr(details->peer, FQDN_LOOKUP_IF_MISS);
+ fqdncache_gethostbyaddr(details->remote, FQDN_LOOKUP_IF_MISS);
typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
incoming_sockets_accepted++;
}
- bool
- ConnStateData::switchToHttps()
+ void
+ ConnStateData::sslCrtdHandleReplyWrapper(void *data, char *reply)
{
- assert(!switchedToHttps_);
+ ConnStateData * state_data = (ConnStateData *)(data);
+ state_data->sslCrtdHandleReply(reply);
+ }
- //HTTPMSGLOCK(currentobject->http->request);
- assert(areAllContextsForThisConnection());
- freeAllContexts();
- //currentobject->connIsFinished();
+ void
+ ConnStateData::sslCrtdHandleReply(const char * reply)
+ {
+ if (!reply) {
+ debugs(1, 1, HERE << "\"ssl_crtd\" helper return <NULL> reply");
+ } else {
+ Ssl::CrtdMessage reply_message;
+ if (reply_message.parse(reply, strlen(reply)) != Ssl::CrtdMessage::OK) {
+ debugs(33, 5, HERE << "Reply from ssl_crtd for " << sslHostName << " is incorrect");
+ } else {
+ if (reply_message.getCode() != "ok") {
+ debugs(33, 5, HERE << "Certificate for " << sslHostName << " cannot be generated. ssl_crtd response: " << reply_message.getBody());
+ } else {
+ debugs(33, 5, HERE << "Certificate for " << sslHostName << " was successfully recieved from ssl_crtd");
+ getSslContextDone(Ssl::generateSslContextUsingPkeyAndCertFromMemory(reply_message.getBody().c_str()), true);
+ return;
+ }
+ }
+ }
+ getSslContextDone(NULL);
+ }
- debugs(33, 5, HERE << "converting " << clientConn << " to SSL");
+ bool
+ ConnStateData::getSslContextStart()
+ {
+ char const * host = sslHostName.termedBuf();
+ if (port->generateHostCertificates && host && strcmp(host, "") != 0) {
+ debugs(33, 5, HERE << "Finding SSL certificate for " << host << " in cache");
+ Ssl::LocalContextStorage & ssl_ctx_cache(Ssl::TheGlobalContextStorage.getLocalStorage(port->s));
+ SSL_CTX * dynCtx = ssl_ctx_cache.find(host);
+ if (dynCtx) {
+ debugs(33, 5, HERE << "SSL certificate for " << host << " have found in cache");
+ if (Ssl::verifySslCertificateDate(dynCtx)) {
+ debugs(33, 5, HERE << "Cached SSL certificate for " << host << " is valid");
+ return getSslContextDone(dynCtx);
+ } else {
+ debugs(33, 5, HERE << "Cached SSL certificate for " << host << " is out of date. Delete this certificate from cache");
+ ssl_ctx_cache.remove(host);
+ }
+ } else {
+ debugs(33, 5, HERE << "SSL certificate for " << host << " haven't found in cache");
+ }
- #if 0 // use the actual clientConn now that we have it.
- // fake a Comm::Connection object; XXX: make ConnState a Comm::Connection?
- Comm::Connection detail;
- detail.local = me;
- detail.remote = peer;
- #endif
+ #if USE_SSL_CRTD
+ debugs(33, 5, HERE << "Generating SSL certificate for " << host << " using ssl_crtd.");
+ Ssl::CrtdMessage request_message;
+ request_message.setCode(Ssl::CrtdMessage::code_new_certificate);
+ Ssl::CrtdMessage::BodyParams map;
+ map.insert(std::make_pair(Ssl::CrtdMessage::param_host, host));
+ std::string bufferToWrite;
+ Ssl::writeCertAndPrivateKeyToMemory(port->signingCert, port->signPkey, bufferToWrite);
+ request_message.composeBody(map, bufferToWrite);
+ Ssl::Helper::GetInstance()->sslSubmit(request_message, sslCrtdHandleReplyWrapper, this);
+ return true;
+ #else
+ debugs(33, 5, HERE << "Generating SSL certificate for " << host);
+ dynCtx = Ssl::generateSslContext(host, port->signingCert, port->signPkey);
+ return getSslContextDone(dynCtx, true);
+ #endif //USE_SSL_CRTD
+ }
+ return getSslContextDone(NULL);
+ }
+
+ bool
+ ConnStateData::getSslContextDone(SSL_CTX * sslContext, bool isNew)
+ {
+ // Try to add generated ssl context to storage.
+ if (port->generateHostCertificates && isNew) {
+ Ssl::LocalContextStorage & ssl_ctx_cache(Ssl::TheGlobalContextStorage.getLocalStorage(port->s));
+ if (sslContext && sslHostName != "") {
+ if (!ssl_ctx_cache.add(sslHostName.termedBuf(), sslContext)) {
+ // If it is not in storage delete after using. Else storage deleted it.
- fd_table[fd].dynamicSslContext = sslContext;
++ fd_table[clientConn->fd].dynamicSslContext = sslContext;
+ }
+ } else {
+ debugs(33, 2, HERE << "Failed to generate SSL cert for " << sslHostName);
+ }
+ }
+
+ // If generated ssl context = NULL, try to use static ssl context.
+ if (!sslContext) {
+ if (!port->staticSslContext) {
- debugs(83, 1, "Closing SSL FD " << fd << " as lacking SSL context");
- comm_close(fd);
++ debugs(83, 1, "Closing SSL " << clientConn->remote << " as lacking SSL context");
++ clientConn->close();
+ return false;
+ } else {
+ debugs(33, 5, HERE << "Using static ssl context.");
+ sslContext = port->staticSslContext.get();
+ }
+ }
- SSL_CTX *sslContext = port->sslContext;
- // fake a ConnectionDetail object; XXX: make ConnState a ConnectionDetail?
- ConnectionDetail detail;
- detail.me = me;
- detail.peer = peer;
-
SSL *ssl = NULL;
- if (!(ssl = httpsCreate(fd, &detail, sslContext)))
+ if (!(ssl = httpsCreate(clientConn, sslContext)))
return false;
// commSetTimeout() was called for this request before we switched.
return true;
}
- debugs(33, 5, HERE << "converting FD " << fd << " to SSL");
+ bool
+ ConnStateData::switchToHttps(const char *host)
+ {
+ assert(!switchedToHttps_);
+
+ sslHostName = host;
+
+ //HTTPMSGLOCK(currentobject->http->request);
+ assert(areAllContextsForThisConnection());
+ freeAllContexts();
+ //currentobject->connIsFinished();
+
++ debugs(33, 5, HERE << "converting " << clientConn << " to SSL");
+
+ return getSslContextStart();
+ }
+
#endif /* USE_SSL */
/// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed
s->http.s << " due to SSL initialization failure.");
s->sslBump = 0;
}
- if (s->sslBump)
+ if (s->sslBump) {
++bumpCount;
+ // Create ssl_ctx cache for this port.
+ Ssl::TheGlobalContextStorage.addLocalStorage(s->s, s->dynamicCertMemCacheSize == std::numeric_limits<size_t>::max() ? 4194304 : s->dynamicCertMemCacheSize);
+ }
#endif
+ #if USE_SSL_CRTD
+ Ssl::Helper::GetInstance();
+ #endif //USE_SSL_CRTD
- /* AYJ: 2009-12-27: bit bumpy. new ListenStateData(...) should be doing all the Comm:: stuff ... */
+// NOTE: would the design here be better if we opened both the ConnAcceptor and IPC informative messages now?
+// that way we have at least one worker listening on the socket immediately with others joining in as
+// they receive the IPC message.
+
+ // Fill out a Comm::Connection which IPC will open as a listener for us
+ // then pass back so we can start a ConnAcceptor subscription.
+ s->listenConn = new Comm::Connection;
+ s->listenConn->local = s->s;
+ s->listenConn->flags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
- const int openFlags = COMM_NONBLOCKING |
- (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
- AsyncCall::Pointer callback = asyncCall(33,2,
- "clientHttpConnectionOpened",
- ListeningStartedDialer(&clientHttpConnectionOpened, s));
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
- Ipc::fdnHttpSocket, callback);
+ AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, s, false));
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpSocket, listenCall, sub);
- HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+ HttpSockets[NHttpSockets++] = -1; // set in clientListenerHttpConnectionOpened
}
#if USE_SSL
CBDATA_CLASS_INIT(ConnStateData);
-ConnStateData::ConnStateData() :AsyncJob("ConnStateData"), transparent_ (false), closing_ (false), switchedToHttps_(false)
+ConnStateData::ConnStateData() :
+ AsyncJob("ConnStateData"),
+ transparent_(false),
- closing_(false)
++ closing_(false),
++ switchedToHttps_(false)
{
pinning.fd = -1;
pinning.pinned = false;
#include "client_side_reply.h"
#include "client_side_request.h"
#include "ClientRequestContext.h"
+#include "comm/Connection.h"
+ #include "comm/Write.h"
#include "compat/inet_pton.h"
#include "fde.h"
#include "HttpReply.h"
void
ClientHttpRequest::sslBumpStart()
{
- debugs(85, 5, HERE << "ClientHttpRequest::sslBumpStart");
-
+ debugs(85, 5, HERE << "Confirming CONNECT tunnel on FD " << getConn()->clientConn);
// send an HTTP 200 response to kick client SSL negotiation
- const int fd = getConn()->fd;
- debugs(33, 7, HERE << "Confirming CONNECT tunnel on FD " << fd);
+ debugs(33, 7, HERE << "Confirming CONNECT tunnel on FD " << getConn()->clientConn);
// TODO: Unify with tunnel.cc and add a Server(?) header
- static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n";
- comm_write(getConn()->clientConn, conn_established, strlen(conn_established), &SslBumpEstablish, this, NULL);
- static const char *const conn_established =
- "HTTP/1.1 200 Connection established\r\n\r\n";
++ static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
+ AsyncCall::Pointer call = commCbCall(85, 5, "ClientSocketContext::sslBumpEstablish",
+ CommIoCbPtrFun(&SslBumpEstablish, this));
- Comm::Write(fd, conn_established, strlen(conn_established), call, NULL);
++ Comm::Write(getConn()->clientConn, conn_established, strlen(conn_established), call, NULL);
}
#endif
#include "fde.h"
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
+#include "comm/Connection.h"
+ #include "comm/IoCallback.h"
+ #include "comm/Write.h"
#include "CommIO.h"
#include "CommRead.h"
-#include "ConnectionDetail.h"
#include "MemBuf.h"
#include "pconn.h"
#include "SquidTime.h"
* New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
*/
- typedef enum {
- IOCB_NONE,
- IOCB_READ,
- IOCB_WRITE
- } iocb_type;
-
static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
-static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
+static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
#if DELAY_POOLS
static void commHandleWriteHelper(void * data);
#endif
- static void commSelectOrQueueWrite(const int fd);
-
- struct comm_io_callback_t {
- iocb_type type;
- int fd;
- AsyncCall::Pointer callback;
- char *buf;
- FREE *freefunc;
- int size;
- int offset;
- comm_err_t errcode;
- int xerrno;
- #if DELAY_POOLS
- unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
- #endif
-
-
- bool active() const { return callback != NULL; }
- };
-
- struct _comm_fd {
- int fd;
- comm_io_callback_t readcb;
- comm_io_callback_t writecb;
- };
- typedef struct _comm_fd comm_fd_t;
- comm_fd_t *commfd_table;
-
- // TODO: make this a comm_io_callback_t method?
- bool
- commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
- {
- assert(ccb->fd == fd);
- assert(ccb->type == type);
- return ccb->active();
- }
-
- /*
- * Configure comm_io_callback_t for I/O
- *
- * @param fd filedescriptor
- * @param ccb comm io callback
- * @param cb callback
- * @param cbdata callback data (must be cbdata'ed)
- * @param buf buffer, if applicable
- * @param freefunc freefunc, if applicable
- * @param size buffer size
- */
- static void
- commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
- AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
- {
- assert(!ccb->active());
- assert(ccb->type == type);
- assert(cb != NULL);
- ccb->fd = fd;
- ccb->callback = cb;
- ccb->buf = buf;
- ccb->freefunc = freefunc;
- ccb->size = size;
- ccb->offset = 0;
- }
-
-
- // Schedule the callback call and clear the callback
- static void
- commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
-class ConnectStateData
--{
- debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
- code << ", " << xerrno << ")");
- assert(ccb->active());
- assert(ccb->fd == fd);
- ccb->errcode = code;
- ccb->xerrno = xerrno;
-
- #if DELAY_POOLS
- ccb->quotaQueueReserv = 0;
- #endif
-
- comm_io_callback_t cb = *ccb;
-
- /* We've got a copy; blow away the real one */
- /* XXX duplicate code from commio_cancel_callback! */
- ccb->xerrno = 0;
- ccb->callback = NULL; // cb has it
-
- /* free data */
- if (cb.freefunc) {
- cb.freefunc(cb.buf);
- cb.buf = NULL;
- }
-
- if (cb.callback != NULL) {
- typedef CommIoCbParams Params;
- Params ¶ms = GetCommParams<Params>(cb.callback);
- params.fd = cb.fd;
- params.buf = cb.buf;
- params.size = cb.offset;
- params.flag = cb.errcode;
- params.xerrno = cb.xerrno;
- ScheduleCallHere(cb.callback);
- }
- }
--
-public:
- void *operator new (size_t);
- void operator delete (void *);
- static void Connect (int fd, void *me);
- void connect();
- void callCallback(comm_err_t status, int xerrno);
- void defaults();
--
- /*
- * Cancel the given callback
- *
- * Remember that the data is cbdataRef'ed.
- */
- // TODO: make this a comm_io_callback_t method
- static void
- commio_cancel_callback(int fd, comm_io_callback_t *ccb)
- {
- debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
- assert(ccb->fd == fd);
- assert(ccb->active());
-// defaults given by client
- char *host;
- u_short default_port;
- Ip::Address default_addr;
- // NP: CANNOT store the default addr:port together as it gets set/reset differently.
--
- ccb->xerrno = 0;
- ccb->callback = NULL;
- DnsLookupDetails dns; ///< host lookup details
- Ip::Address S;
- AsyncCall::Pointer callback;
--
- #if DELAY_POOLS
- ccb->quotaQueueReserv = 0;
- #endif
- }
- int fd;
- int tries;
- int addrcount;
- int connstart;
--
- /*
- * Call the given comm callback; assumes the callback is valid.
- *
- * @param ccb io completion callback
- */
- void
- commio_call_callback(comm_io_callback_t *ccb)
- {
- }
-private:
- int commResetFD();
- int commRetryConnect();
- CBDATA_CLASS(ConnectStateData);
-};
--
/* STATIC */
static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs
static void commSetTcpNoDelay(int);
#endif
static void commSetTcpRcvbuf(int, int);
- static PF commHandleWrite;
-static PF commConnectFree;
-static IPH commConnectDnsHandle;
++/*
typedef enum {
COMM_CB_READ = 1,
COMM_CB_DERIVED
} comm_callback_t;
++*/
static MemAllocator *conn_close_pool = NULL;
fd_debug_t *fdd_table = NULL;
}
void
-comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback)
+comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
{
- debugs(5, 5, "comm_read, queueing read for FD " << fd << "; asynCall " << callback);
+ debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
/* Make sure we are open and not closing */
- assert(isOpen(fd));
- assert(!fd_table[fd].closing());
- Comm::IoCallback *ccb = COMMIO_FD_READCB(fd);
+ assert(Comm::IsConnOpen(conn));
+ assert(!fd_table[conn->fd].closing());
- comm_io_callback_t *ccb = COMMIO_FD_READCB(conn->fd);
++ Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
// Make sure we are either not reading or just passively monitoring.
// Active/passive conflicts are OK and simply cancel passive monitoring.
if (ccb->active()) {
// if the assertion below fails, we have an active comm_read conflict
- assert(fd_table[fd].halfClosedReader != NULL);
- commStopHalfClosedMonitor(fd);
+ assert(fd_table[conn->fd].halfClosedReader != NULL);
+ commStopHalfClosedMonitor(conn->fd);
assert(!ccb->active());
}
++ ccb->conn = conn;
/* Queue the read */
- commio_set_callback(conn->fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size);
+ ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
- commSetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
+ commSetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
}
/**
#include "squid.h"
#include "AsyncEngine.h"
#include "base/AsyncCall.h"
- #include "comm/comm_err_t.h"
- #include "comm/forward.h"
- #include "ip/Address.h"
+#include "CommCalls.h"
+ #include "comm_err_t.h"
+ #include "comm/IoCallback.h"
#include "StoreIOBuffer.h"
-#include "Array.h"
-#include "ip/Address.h"
-
-class DnsLookupDetails;
-typedef void CNCB(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data);
-
-typedef void IOCB(int fd, char *, size_t size, comm_err_t flag, int xerrno, void *data);
-
- #define COMMIO_FD_READCB(fd) (&commfd_table[(fd)].readcb)
- #define COMMIO_FD_WRITECB(fd) (&commfd_table[(fd)].writecb)
-
-
/* comm.c */
extern bool comm_iocallbackpending(void); /* inline candidate */
SQUIDCEXTERN void commResetSelect(int);
SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen);
- extern void comm_write(const Comm::ConnectionPointer &conn, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func);
- extern void comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL);
- SQUIDCEXTERN void comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, IOCB * handler, void *handler_data);
- extern void comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
SQUIDCEXTERN void commCallCloseHandlers(int fd);
SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *);
-extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback);
+extern int commSetTimeout(int fd, int, AsyncCall::Pointer &callback);
+
+/**
+ * Set or clear the timeout for some action on an active connection.
+ * API to replace commSetTimeout() when a Comm::ConnectionPointer is available.
+ */
+extern int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback);
+
SQUIDCEXTERN int ignoreErrno(int);
SQUIDCEXTERN void commCloseAllSockets(void);
SQUIDCEXTERN void checkTimeouts(void);
--- /dev/null
- #include "config.h"
+#ifndef SQUID_COMM_CONNACCEPTOR_H
+#define SQUID_COMM_CONNACCEPTOR_H
+
- #include "comm/comm_err_t.h"
++#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
- }; // namespace Comm
++#include "comm_err_t.h"
+#include "comm/forward.h"
+
+#if HAVE_MAP
+#include <map>
+#endif
+
+namespace Comm
+{
+
+class AcceptLimiter;
+
+/**
+ * Listens on a Comm::Connection for new incoming connections and
+ * emits an active Comm::Connection descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class ConnAcceptor : public AsyncJob
+{
+private:
+ virtual void start();
+ virtual bool doneAll() const;
+ virtual void swanSong();
+
+public:
+ ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub);
+ ConnAcceptor(const ConnAcceptor &r); // not implemented.
+
+ /** Subscribe a handler to receive calls back about new connections.
+ * Replaces any existing subscribed handler.
+ */
+ void subscribe(const Subscription::Pointer &aSub);
+
+ /** Remove the currently waiting callback subscription.
+ * Pending calls will remain scheduled.
+ */
+ void unsubscribe(const char *reason);
+
+ /** Try and accept another connection (synchronous).
+ * If one is pending already the subscribed callback handler will be scheduled
+ * to handle it before this method returns.
+ */
+ void acceptNext();
+
+ /// Call the subscribed callback handler with details about a new connection.
+ void notify(comm_err_t flag, const Comm::ConnectionPointer &details);
+
+ /// errno code of the last accept() or listen() action if one occurred.
+ int errcode;
+
+private:
+ friend class AcceptLimiter;
+ int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue.
+ Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events.
+
+ /// conn being listened on for new connections
+ /// Reserved for read-only use.
+ ConnectionPointer conn;
+
+private:
+ /// Method to test if there are enough file descriptors to open a new client connection
+ /// if not the accept() will be postponed
+ static bool okToAccept();
+
+ /// Method callback for whenever an FD is ready to accept a client connection.
+ static void doAccept(int fd, void *data);
+
+ void acceptOne();
+ comm_err_t oldAccept(Comm::ConnectionPointer &details);
+ void setListen();
+
+ CBDATA_CLASS2(ConnAcceptor);
+};
+
++} // namespace Comm
+
+#endif /* SQUID_COMM_CONNACCEPTOR_H */
--- /dev/null
+/*
+ * DEBUG: section 05 Socket Connection Opener
+ */
+
+#include "config.h"
+//#include "base/TextException.h"
+#include "comm/ConnOpener.h"
+#include "comm/Connection.h"
+#include "comm.h"
+#include "fde.h"
+#include "icmp/net_db.h"
+#include "SquidTime.h"
+
+namespace Comm {
+ CBDATA_CLASS_INIT(ConnOpener);
+};
+
+Comm::ConnOpener::ConnOpener(Comm::ConnectionPointer &c, AsyncCall::Pointer &handler, time_t ctimeout) :
+ AsyncJob("Comm::ConnOpener"),
+ host_(NULL),
+ conn_(c),
+ callback_(handler),
+ totalTries_(0),
+ failRetries_(0),
+ connectTimeout_(ctimeout),
+ connectStart_(0)
+{}
+
+Comm::ConnOpener::~ConnOpener()
+{
+ safe_free(host_);
+}
+
+bool
+Comm::ConnOpener::doneAll() const
+{
+ // is the conn_ to be opened still waiting?
+ if (conn_ != NULL) {
+ return false;
+ }
+
+ // is the callback still to be called?
+ if (callback_ != NULL) {
+ return false;
+ }
+
+ return AsyncJob::doneAll();
+}
+
+void
+Comm::ConnOpener::swanSong()
+{
+ // cancel any event watchers
+ // done here to get the "swanSong" mention in cancel debugging.
+ if (calls_.earlyAbort_ != NULL) {
+ calls_.earlyAbort_->cancel("Comm::ConnOpener::swanSong");
+ calls_.earlyAbort_ = NULL;
+ }
+ if (calls_.timeout_ != NULL) {
+ calls_.timeout_->cancel("Comm::ConnOpener::swanSong");
+ calls_.timeout_ = NULL;
+ }
+
+ // rollback what we can from the job state
+ if (conn_ != NULL && conn_->isOpen()) {
+ // drop any handlers now to save a lot of cycles later
+ commSetSelect(conn_->fd, COMM_SELECT_WRITE, NULL, NULL, 0);
+ commSetTimeout(conn_->fd, -1, NULL, NULL);
+ // it never reached fully open, so abort the FD
+ conn_->close();
+ }
+
+ if (callback_ != NULL) {
+ // inform the still-waiting caller we are dying
+ doneConnecting(COMM_ERR_CONNECT, 0);
+ }
+
+ AsyncJob::swanSong();
+}
+
+void
+Comm::ConnOpener::setHost(const char * new_host)
+{
+ // unset and erase if already set.
+ if (host_ != NULL)
+ safe_free(host_);
+
+ // set the new one if given.
+ if (new_host != NULL)
+ host_ = xstrdup(new_host);
+}
+
+const char *
+Comm::ConnOpener::getHost() const
+{
+ return host_;
+}
+
+/**
+ * Connection attempt are completed. One way or the other.
+ * Pass the results back to the external handler.
+ */
+void
+Comm::ConnOpener::doneConnecting(comm_err_t status, int xerrno)
+{
+ if (callback_ != NULL) {
+ typedef CommConnectCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback_);
+ params.conn = conn_;
+ params.flag = status;
+ params.xerrno = xerrno;
+ ScheduleCallHere(callback_);
+ callback_ = NULL;
+ }
+
+ /* ensure cleared local state, we are done. */
+ conn_ = NULL;
+}
+
+void
+Comm::ConnOpener::start()
+{
+ Must(conn_ != NULL);
+
+ /* get a socket open ready for connecting with */
+ if (!conn_->isOpen()) {
+#if USE_IPV6
+ /* outbound sockets have no need to be protocol agnostic. */
+ if (conn_->remote.IsIPv4()) {
+ conn_->local.SetIPv4();
+ }
+#endif
+ conn_->fd = comm_openex(SOCK_STREAM, IPPROTO_TCP, conn_->local, conn_->flags, conn_->tos, conn_->nfmark, host_);
+ if (!conn_->isOpen()) {
+ doneConnecting(COMM_ERR_CONNECT, 0);
+ return;
+ }
+ }
+
+ typedef CommCbMemFunT<Comm::ConnOpener, CommConnectCbParams> abortDialer;
+ calls_.earlyAbort_ = JobCallback(5, 4, abortDialer, this, Comm::ConnOpener::earlyAbort);
+ comm_add_close_handler(conn_->fd, calls_.earlyAbort_);
+
+ typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> timeoutDialer;
+ calls_.timeout_ = JobCallback(5, 4, timeoutDialer, this, Comm::ConnOpener::timeout);
+ debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
+ commSetTimeout(conn_->fd, connectTimeout_, calls_.timeout_);
+
+ connectStart_ = squid_curtime;
+ connect();
+}
+
+void
+Comm::ConnOpener::connected()
+{
+ /*
+ * stats.conn_open is used to account for the number of
+ * connections that we have open to the peer, so we can limit
+ * based on the max-conn option. We need to increment here,
+ * even if the connection may fail.
+ */
+ if (conn_->getPeer())
+ conn_->getPeer()->stats.conn_open++;
+
+ lookupLocalAddress();
+
+ /* TODO: remove these fd_table accesses. But old code still depends on fd_table flags to
+ * indicate the state of a raw fd object being passed around.
+ * Also, legacy code still depends on comm_local_port() with no access to Comm::Connection
+ * when those are done comm_local_port can become one of our member functions to do the below.
+ */
+ fd_table[conn_->fd].flags.open = 1;
+ fd_table[conn_->fd].local_addr = conn_->local;
+}
+
+/** Make an FD connection attempt.
+ * Handles the case(s) when a partially setup connection gets closed early.
+ */
+void
+Comm::ConnOpener::connect()
+{
+ Must(conn_ != NULL);
+
+ totalTries_++;
+
+ switch (comm_connect_addr(conn_->fd, conn_->remote) ) {
+
+ case COMM_INPROGRESS:
+ // check for timeout FIRST.
+ if (squid_curtime - connectStart_ > connectTimeout_) {
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
+ conn_->close();
+ doneConnecting(COMM_TIMEOUT, errno);
+ return;
+ } else {
+ debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
+ commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::InProgressConnectRetry, this, 0);
+ }
+ break;
+
+ case COMM_OK:
+ debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
+
+ connected();
+
+ if (host_ != NULL)
+ ipcacheMarkGoodAddr(host_, conn_->remote);
+ doneConnecting(COMM_OK, 0);
+ break;
+
+ default:
+ debugs(5, 5, HERE << conn_ << ": * - try again");
+ failRetries_++;
+ if (host_ != NULL)
+ ipcacheMarkBadAddr(host_, conn_->remote);
+#if USE_ICMP
+ if (Config.onoff.test_reachability)
+ netdbDeleteAddrNetwork(conn_->remote);
+#endif
+
+ // check for timeout FIRST.
+ if(squid_curtime - connectStart_ > connectTimeout_) {
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
++ if (calls_.earlyAbort_ != NULL) {
++ calls_.earlyAbort_->cancel("Comm::ConnOpener::connect timed out");
++ calls_.earlyAbort_ = NULL;
++ }
+ conn_->close();
+ doneConnecting(COMM_TIMEOUT, errno);
+ } else if (failRetries_ < Config.connect_retries) {
+ eventAdd("Comm::ConnOpener::DelayedConnectRetry", Comm::ConnOpener::DelayedConnectRetry, this, 0.05, 0);
+ } else {
+ // send ERROR back to the upper layer.
+ debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
++ if (calls_.earlyAbort_ != NULL) {
++ calls_.earlyAbort_->cancel("Comm::ConnOpener::connect failed");
++ calls_.earlyAbort_ = NULL;
++ }
+ conn_->close();
+ doneConnecting(COMM_ERR_CONNECT, errno);
+ }
+ }
+}
+
+/**
+ * Lookup local-end address and port of the TCP link just opened.
+ * This ensure the connection local details are set correctly
+ */
+void
+Comm::ConnOpener::lookupLocalAddress()
+{
+ struct addrinfo *addr = NULL;
+ conn_->local.InitAddrInfo(addr);
+
+ if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) {
+ debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror());
+ conn_->local.FreeAddrInfo(addr);
+ return;
+ }
+
+ conn_->local = *addr;
+ conn_->local.FreeAddrInfo(addr);
+ debugs(5, 6, HERE << conn_);
+}
+
+/** Abort connection attempt.
+ * Handles the case(s) when a partially setup connection gets closed early.
+ */
+void
+Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io)
+{
+ debugs(5, 3, HERE << io.conn);
+ doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
+}
+
+/**
+ * Handles the case(s) when a partially setup connection gets timed out.
+ * NP: When commSetTimeout accepts generic CommCommonCbParams this can die.
+ */
+void
+Comm::ConnOpener::timeout(const CommTimeoutCbParams &)
+{
+ connect();
+}
+
+/* Legacy Wrapper for the retry event after COMM_INPROGRESS
+ * XXX: As soon as comm commSetSelect() accepts Async calls we can use a ConnOpener::connect call
+ */
+void
+Comm::ConnOpener::InProgressConnectRetry(int fd, void *data)
+{
+ ConnOpener *cs = static_cast<Comm::ConnOpener *>(data);
+ assert(cs);
+
+ // Ew. we are now outside the all AsyncJob protections.
+ // get back inside by scheduling another call...
+ typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
+ AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
+ ScheduleCallHere(call);
+}
+
+/* Legacy Wrapper for the retry event with small delay after errors.
+ * XXX: As soon as eventAdd() accepts Async calls we can use a ConnOpener::connect call
+ */
+void
+Comm::ConnOpener::DelayedConnectRetry(void *data)
+{
+ ConnOpener *cs = static_cast<Comm::ConnOpener *>(data);
+ assert(cs);
+
+ // Ew. we are now outside the all AsyncJob protections.
+ // get back inside by scheduling another call...
+ typedef NullaryMemFunT<Comm::ConnOpener> Dialer;
+ AsyncCall::Pointer call = JobCallback(5, 4, Dialer, cs, Comm::ConnOpener::connect);
+ ScheduleCallHere(call);
+}
--- /dev/null
- iocb_table[pos].readcb.fd = pos;
+ #include "config.h"
+ #include "ClientInfo.h"
++#include "comm/Connection.h"
+ #include "comm/IoCallback.h"
+ #include "comm/Write.h"
+ #include "CommCalls.h"
+ #include "fde.h"
+
+ Comm::CbEntry *Comm::iocb_table;
+
+ void
+ Comm::CallbackTableInit()
+ {
+ // XXX: convert this to a std::map<> ?
+ iocb_table = static_cast<CbEntry*>(xcalloc(Squid_MaxFD, sizeof(CbEntry)));
+ for (int pos = 0; pos < Squid_MaxFD; pos++) {
+ iocb_table[pos].fd = pos;
- iocb_table[pos].writecb.fd = pos;
++// iocb_table[pos].readcb.fd = pos;
+ iocb_table[pos].readcb.type = IOCB_READ;
- if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
++// iocb_table[pos].writecb.fd = pos;
+ iocb_table[pos].writecb.type = IOCB_WRITE;
+ }
+ }
+
+ void
+ Comm::CallbackTableDestruct()
+ {
++ // release any Comm::Connections being held.
++ for (int pos = 0; pos < Squid_MaxFD; pos++) {
++ iocb_table[pos].readcb.conn = NULL;
++ iocb_table[pos].writecb.conn = NULL;
++ }
+ safe_free(iocb_table);
+ }
+
+ /**
+ * Configure Comm::Callback for I/O
+ *
+ * @param fd filedescriptor
+ * @param t IO callback type (read or write)
+ * @param cb callback
+ * @param buf buffer, if applicable
+ * @param func freefunc, if applicable
+ * @param sz buffer size
+ */
+ void
+ Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz)
+ {
+ assert(!active());
+ assert(type == t);
+ assert(cb != NULL);
+
+ callback = cb;
+ buf = b;
+ freefunc = f;
+ size = sz;
+ offset = 0;
+ }
+
+ void
+ Comm::IoCallback::selectOrQueueWrite()
+ {
+ #if DELAY_POOLS
+ // stand in line if there is one
- quotaQueueReserv = clientInfo->quotaEnqueue(fd);
++ if (ClientInfo *clientInfo = fd_table[conn->fd].clientInfo) {
+ if (clientInfo->writeLimitingActive) {
- commSetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
++ quotaQueueReserv = clientInfo->quotaEnqueue(conn->fd);
+ clientInfo->kickQuotaQueue();
+ return;
+ }
+ }
+ #endif
+
- debugs(5, 3, HERE << "called for FD " << fd << " (" << code << ", " << xerrno << ")");
++ commSetSelect(conn->fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
+ }
+
+ void
+ Comm::IoCallback::cancel(const char *reason)
+ {
+ if (!active())
+ return;
+
+ callback->cancel(reason);
+ callback = NULL;
+ reset();
+ }
+
+ void
+ Comm::IoCallback::reset()
+ {
++ conn = NULL;
+ if (freefunc) {
+ freefunc(buf);
+ buf = NULL;
+ freefunc = NULL;
+ }
+ xerrno = 0;
+
+ #if DELAY_POOLS
+ quotaQueueReserv = 0;
+ #endif
+ }
+
+ // Schedule the callback call and clear the callback
+ void
+ Comm::IoCallback::finish(comm_err_t code, int xerrn)
+ {
- params.fd = fd;
++ debugs(5, 3, HERE << "called for " << conn << " (" << code << ", " << xerrno << ")");
+ assert(active());
+
+ /* free data */
+ if (freefunc) {
+ freefunc(buf);
+ buf = NULL;
+ freefunc = NULL;
+ }
+
+ if (callback != NULL) {
+ typedef CommIoCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
++ if (conn != NULL) params.fd = conn->fd; // for legacy write handlers...
++ params.conn = conn;
+ params.buf = buf;
+ params.size = offset;
+ params.flag = code;
+ params.xerrno = xerrn;
+ ScheduleCallHere(callback);
+ callback = NULL;
+ }
+
+ /* Reset for next round. */
+ reset();
+ }
--- /dev/null
- int fd;
+ #ifndef _SQUID_COMM_IOCALLBACK_H
+ #define _SQUID_COMM_IOCALLBACK_H
+
+ #include "config.h"
+ #include "base/AsyncCall.h"
+ #include "comm_err_t.h"
++#include "comm/forward.h"
+
+ namespace Comm {
+
+ /// Type of IO callbacks the Comm layer deals with.
+ typedef enum {
+ IOCB_NONE,
+ IOCB_READ,
+ IOCB_WRITE
+ } iocb_type;
+
+ /// Details about a particular Comm IO callback event.
+ class IoCallback {
+ public:
+ iocb_type type;
++ Comm::ConnectionPointer conn;
+ AsyncCall::Pointer callback;
+ char *buf;
+ FREE *freefunc;
+ int size;
+ int offset;
+ comm_err_t errcode;
+ int xerrno;
+ #if DELAY_POOLS
+ unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
+ #endif
+
+ bool active() const { return callback != NULL; }
+ void setCallback(iocb_type type, AsyncCall::Pointer &cb, char *buf, FREE *func, int sz);
+
+ /// called when fd needs to write but may need to wait in line for its quota
+ void selectOrQueueWrite();
+
+ /// Actively cancel the given callback
+ void cancel(const char *reason);
+
+ /// finish the IO operation imediately and schedule the callback with the current state.
+ void finish(comm_err_t code, int xerrn);
+
+ private:
+ void reset();
+ };
+
+ /// Entry nodes for the IO callback table: iocb_table
+ /// Keyed off the FD which the event applies to.
+ class CbEntry {
+ public:
+ int fd;
+ IoCallback readcb;
+ IoCallback writecb;
+ };
+
+ /// Table of scheduled IO events which have yet to be processed ??
+ /// Callbacks which might be scheduled in future are stored in fd_table.
+ extern CbEntry *iocb_table;
+
+ extern void CallbackTableInit();
+ extern void CallbackTableDestruct();
+
+ #define COMMIO_FD_READCB(fd) (&Comm::iocb_table[(fd)].readcb)
+ #define COMMIO_FD_WRITECB(fd) (&Comm::iocb_table[(fd)].writecb)
+
+ } // namespace Comm
+
+ #endif /* _SQUID_COMM_IOCALLBACK_H */
libcomm_la_SOURCES= \
AcceptLimiter.cc \
AcceptLimiter.h \
- ListenStateData.cc \
- ListenStateData.h \
+ ConnAcceptor.cc \
+ ConnAcceptor.h \
\
- comm_err_t.h \
- comm_internal.h \
- forward.h
+ ConnOpener.cc \
+ ConnOpener.h \
+ \
+ Connection.cc \
+ Connection.h \
++ forward.h \
+ IoCallback.cc \
+ IoCallback.h \
+ Write.cc \
+ Write.h \
+ \
+ comm_internal.h
--- /dev/null
-Comm::Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
+ #include "config.h"
+ #if DELAY_POOLS
+ #include "ClientInfo.h"
+ #endif
++#include "comm/Connection.h"
+ #include "comm/IoCallback.h"
+ #include "comm/Write.h"
+ #include "fde.h"
+ #include "SquidTime.h"
+ #include "MemBuf.h"
+
+ void
- Comm::Write(fd, mb->buf, mb->size, callback, mb->freeFunc());
++Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
+ {
-Comm::Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
++ Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
+ }
+
+ void
- debugs(5, 5, HERE << "FD " << fd << ": sz " << size << ": asynCall " << callback);
++Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
+ {
- assert(fd_table[fd].flags.open);
- assert(!fd_table[fd].closing());
- Comm::IoCallback *ccb = COMMIO_FD_WRITECB(fd);
++ debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
+
+ /* Make sure we are open, not closing, and not writing */
- fd_table[fd].writeStart = squid_curtime;
++ assert(fd_table[conn->fd].flags.open);
++ assert(!fd_table[conn->fd].closing());
++ Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
+ assert(!ccb->active());
+
- * We have to use the comm iocb_table to map FD numbers to waiting data.
++ fd_table[conn->fd].writeStart = squid_curtime;
++ ccb->conn = conn;
+ /* Queue the write */
+ ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
+ ccb->selectOrQueueWrite();
+ }
+
+ /** Write to FD.
+ * This function is used by the lowest level of IO loop which only has access to FD numbers.
- assert(state->fd == fd);
++ * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
+ * Once the write has been concluded we schedule the waiting call with success/fail results.
+ */
+ void
+ Comm::HandleWrite(int fd, void *data)
+ {
+ Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
+ int len = 0;
+ int nleft;
+
- debugs(5, 5, HERE << "FD " << state->fd << ": off " <<
++ assert(state->conn != NULL && state->conn->fd == fd);
+
+ PROF_start(commHandleWrite);
- debugs(5, 5, HERE << "FD " << fd << " writes only " <<
++ debugs(5, 5, HERE << state->conn << ": off " <<
+ (long int) state->offset << ", sz " << (long int) state->size << ".");
+
+ nleft = state->size - state->offset;
+
+ #if DELAY_POOLS
+ ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+ if (clientInfo && !clientInfo->writeLimitingActive)
+ clientInfo = NULL; // we only care about quota limits here
+
+ if (clientInfo) {
+ assert(clientInfo->selectWaiting);
+ clientInfo->selectWaiting = false;
+
+ assert(clientInfo->hasQueue());
+ assert(clientInfo->quotaPeekFd() == fd);
+ clientInfo->quotaDequeue(); // we will write or requeue below
+
+ if (nleft > 0) {
+ const int quota = clientInfo->quotaForDequed();
+ if (!quota) { // if no write quota left, queue this fd
+ state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+ clientInfo->kickQuotaQueue();
+ PROF_stop(commHandleWrite);
+ return;
+ }
+
+ const int nleft_corrected = min(nleft, quota);
+ if (nleft != nleft_corrected) {
++ debugs(5, 5, HERE << state->conn << " writes only " <<
+ nleft_corrected << " out of " << nleft);
+ nleft = nleft_corrected;
+ }
+
+ }
+ }
+ #endif
+
+ /* actually WRITE data */
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+ debugs(5, 5, HERE << "write() returns " << len);
+
+ #if DELAY_POOLS
+ if (clientInfo) {
+ if (len > 0) {
+ /* we wrote data - drain them from bucket */
+ clientInfo->bucketSize -= len;
+ if (clientInfo->bucketSize < 0.0) {
+ debugs(5,1, HERE << "drained too much"); // should not happen
+ clientInfo->bucketSize = 0;
+ }
+ }
+
+ // even if we wrote nothing, we were served; give others a chance
+ clientInfo->kickQuotaQueue();
+ }
+ #endif
+
+ fd_bytes(fd, len, FD_WRITE);
+ statCounter.syscalls.sock.writes++;
+ // After each successful partial write,
+ // reset fde::writeStart to the current time.
+ fd_table[fd].writeStart = squid_curtime;
+
+ if (len == 0) {
+ /* Note we even call write if nleft == 0 */
+ /* We're done */
+ if (nleft != 0)
+ debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
+
+ state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+ } else if (len < 0) {
+ /* An error */
+ if (fd_table[fd].flags.socket_eof) {
+ debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+ state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+ } else if (ignoreErrno(errno)) {
+ debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+ state->selectOrQueueWrite();
+ } else {
+ debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+ state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+ }
+ } else {
+ /* A successful write, continue */
+ state->offset += len;
+
+ if (state->offset < state->size) {
+ /* Not done, reinstall the write handler and write some more */
+ state->selectOrQueueWrite();
+ } else {
+ state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
+ }
+ }
+
+ PROF_stop(commHandleWrite);
+ }
--- /dev/null
-void Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
+ #ifndef _SQUID_COMM_IOWRITE_H
+ #define _SQUID_COMM_IOWRITE_H
+
+ #include "base/AsyncCall.h"
++#include "comm/forward.h"
+
+ namespace Comm {
+
+ /**
+ * Queue a write. callback is scheduled when the write
+ * completes, on error, or on file descriptor close.
+ *
+ * free_func is used to free the passed buffer when the write has completed.
+ */
-void Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
++void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
+
+ /**
+ * Queue a write. callback is scheduled when the write
+ * completes, on error, or on file descriptor close.
+ */
-void WriteCancel(int fd, const char *reason);
++void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
+
+ /// Cancel the write pending on FD. No action if none pending.
++void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
+
+ // callback handler to process an FD which is available for writing.
+ extern PF HandleWrite;
+
+ } // namespace Comm
+
+ #endif /* _SQUID_COMM_IOWRITE_H */
*
*/
-#include "config.h"
#include "squid.h"
-#include "event.h"
-#include "SquidTime.h"
-#include "Store.h"
-#include "comm.h"
+#include "base/InstanceId.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
+ #include "comm/Write.h"
+#include "comm.h"
+#include "event.h"
#include "fde.h"
#include "ip/tools.h"
#include "MemBuf.h"
vc->busy = 1;
- commSetTimeout(vc->fd, Config.Timeout.idns_query, NULL, NULL);
+ commSetTimeout(vc->conn->fd, Config.Timeout.idns_query, NULL, NULL);
- comm_write_mbuf(vc->conn, mb, idnsSentQueryVC, vc);
+ AsyncCall::Pointer call = commCbCall(78, 5, "idnsSentQueryVC",
+ CommIoCbPtrFun(&idnsSentQueryVC, vc));
-
- Comm::Write(vc->fd, mb, call);
++ Comm::Write(vc->conn, mb, call);
delete mb;
}
*
*/
#include "config.h"
-
+#include "auth/UserRequest.h"
+#include "comm/Connection.h"
+ #include "comm/Write.h"
+#include "err_detail_type.h"
#include "errorpage.h"
-#include "auth/UserRequest.h"
-#include "SquidTime.h"
-#include "Store.h"
+#include "fde.h"
#include "html_quote.h"
#include "HttpReply.h"
#include "HttpRequest.h"
err->flags.flag_cbdata = 1;
rep = err->BuildHttpReply();
-
MemBuf *mb = rep->pack();
- comm_write_mbuf(conn, mb, errorSendComplete, err);
+ AsyncCall::Pointer call = commCbCall(78, 5, "errorSendComplete",
+ CommIoCbPtrFun(&errorSendComplete, err));
- Comm::Write(fd, mb, call);
++ Comm::Write(conn, mb, call);
delete mb;
-
delete rep;
}
#include "squid.h"
#include "comm.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnOpener.h"
+#include "comm/ConnAcceptor.h"
+ #include "comm/Write.h"
#include "compat/strtoll.h"
-#include "ConnectionDetail.h"
#include "errorpage.h"
#include "fde.h"
#include "forward.h"
}
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
- AsyncCall::Pointer call = JobCallback(9, 5,
- Dialer, this, FtpStateData::ftpWriteCommandCallback);
- Comm::Write(ctrl.fd, ctrl.last_command, strlen(ctrl.last_command), call, NULL);
+ AsyncCall::Pointer call = JobCallback(9, 5, Dialer, this, FtpStateData::ftpWriteCommandCallback);
- comm_write(ctrl.conn, ctrl.last_command, strlen(ctrl.last_command), call);
++ Comm::Write(ctrl.conn, ctrl.last_command, strlen(ctrl.last_command), call, NULL);
scheduleReadControlReply(0);
}
snprintf(buf, 4096, "%s\r\n", gopherState->request);
}
- debugs(10, 5, "gopherSendRequest: FD " << fd);
+ debugs(10, 5, HERE << gopherState->serverConn);
- comm_write(gopherState->serverConn, buf, strlen(buf), gopherSendComplete, gopherState, NULL);
+ AsyncCall::Pointer call = commCbCall(5,5, "gopherSendComplete",
+ CommIoCbPtrFun(gopherSendComplete, gopherState));
- Comm::Write(fd, buf, strlen(buf), call, NULL);
++ Comm::Write(gopherState->serverConn, buf, strlen(buf), call, NULL);
if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE))
gopherState->entry->setPublicKey(); /* Make it public */
*/
#include "squid.h"
+#include "comm.h"
+#include "comm/Connection.h"
+ #include "comm/Write.h"
#include "helper.h"
+#include "MemBuf.h"
#include "SquidMath.h"
#include "SquidTime.h"
#include "Store.h"
cbdataFree(srv);
}
- int wfd = srv->wfd;
- srv->wfd = -1;
- if (srv->rfd == wfd)
- srv->rfd = -1;
+ /// Calls back with a pointer to the buffer with the helper output
+ static void helperReturnBuffer(int request_number, helper_server * srv, helper * hlp, char * msg, char * msg_end)
+ {
+ helper_request *r = srv->requests[request_number];
+ if (r) {
+ HLPCB *callback = r->callback;
+
+ srv->requests[request_number] = NULL;
+
+ r->callback = NULL;
+
+ void *cbdata = NULL;
+ if (cbdataReferenceValidDone(r->data, &cbdata))
+ callback(cbdata, msg);
+
+ srv->stats.pending--;
+
+ hlp->stats.replies++;
+
+ srv->answer_time = current_time;
+
+ srv->dispatch_time = r->dispatch_time;
+
+ hlp->stats.avg_svc_time =
+ Math::intAverage(hlp->stats.avg_svc_time,
+ tvSubMsec(r->dispatch_time, current_time),
+ hlp->stats.replies, REDIRECT_AV_FACTOR);
+
+ helperRequestFree(r);
+ } else {
+ debugs(84, 1, "helperHandleRead: unexpected reply on channel " <<
+ request_number << " from " << hlp->id_name << " #" << srv->index + 1 <<
+ " '" << srv->rbuf << "'");
+ }
+ srv->roffset -= (msg_end - srv->rbuf);
+ memmove(srv->rbuf, msg_end, srv->roffset + 1);
+
+ if (!srv->flags.shutdown) {
+ helperKickQueue(hlp);
+ } else if (!srv->flags.closing && !srv->stats.pending) {
- comm_close(wfd);
+ srv->flags.closing=1;
++ srv->writePipe->close();
+ return;
+ }
+ }
static void
-helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
char *t = NULL;
helper_server *srv = (helper_server *)data;
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = 1;
- comm_write(srv->writePipe,
- srv->writebuf->content(),
- srv->writebuf->contentSize(),
- helperDispatchWriteDone, /* Handler */
- srv, NULL); /* Handler-data, freefunc */
+ AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
+ CommIoCbPtrFun(helperDispatchWriteDone, srv));
- Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
++ Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
}
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = 1;
- comm_write(srv->writePipe,
- srv->writebuf->content(),
- srv->writebuf->contentSize(),
- helperDispatchWriteDone, /* Handler */
- srv, NULL); /* Handler-data, free func */
+ AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
+ CommIoCbPtrFun(helperDispatchWriteDone, srv));
- Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
++ Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
srv->flags.reserved = 1;
srv->request = r;
srv->dispatch_time = current_time;
- comm_write(srv->writePipe,
- r->buf,
- strlen(r->buf),
- helperStatefulDispatchWriteDone, /* Handler */
- hlp, NULL); /* Handler-data, free func */
+ AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
+ CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
- Comm::Write(srv->wfd, r->buf, strlen(r->buf), call, NULL);
++ Comm::Write(srv->writePipe, r->buf, strlen(r->buf), call, NULL);
debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
hlp->id_name << " #" << srv->index + 1 << ", " <<
(int) strlen(r->buf) << " bytes");
#include "base/AsyncJobCalls.h"
#include "base/TextException.h"
#include "base64.h"
+#include "comm/Connection.h"
+ #include "comm/Write.h"
#if DELAY_POOLS
#include "DelayPools.h"
#endif
mb.init();
request->peer_host=_peer?_peer->host:NULL;
buildRequestPrefix(request, orig_request, entry, &mb);
- debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf);
- Comm::Write(fd, &mb, requestSender);
+ debugs(11, 6, HERE << serverConnection << ":\n" << mb.buf);
- comm_write_mbuf(serverConnection, &mb, requestSender);
++ Comm::Write(serverConnection, &mb, requestSender);
+
return true;
}
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
requestSender = JobCallback(11,5,
Dialer, this, HttpStateData::wroteLast);
- comm_write(serverConnection, "\r\n", 2, requestSender);
- Comm::Write(fd, "\r\n", 2, requestSender, NULL);
++ Comm::Write(serverConnection, "\r\n", 2, requestSender, NULL);
return true;
#else
return false;
flags.sentLastChunk = true;
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
- requestSender = JobCallback(11,5,
- Dialer, this, HttpStateData::wroteLast);
- Comm::Write(fd, "0\r\n\r\n", 5, requestSender, NULL);
+ requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast);
- comm_write(serverConnection, "0\r\n\r\n", 5, requestSender);
++ Comm::Write(serverConnection, "0\r\n\r\n", 5, requestSender, NULL);
return true;
}
#if USE_IDENT
#include "comm.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
+#include "CommCalls.h"
+ #include "comm/Write.h"
#include "ident/Config.h"
#include "ident/Ident.h"
#include "MemBuf.h"
MemBuf mb;
mb.init();
mb.Printf("%d, %d\r\n",
- state->my_peer.GetPort(),
- state->me.GetPort());
-
+ conn->remote.GetPort(),
+ conn->local.GetPort());
- comm_write_mbuf(conn, &mb, NULL, state);
+ AsyncCall::Pointer nil;
- Comm::Write(fd, &mb, nil);
- comm_read(fd, state->buf, BUFSIZ, Ident::ReadReply, state);
- commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state);
++ Comm::Write(conn, &mb, nil);
+ comm_read(conn, state->buf, BUFSIZ, Ident::ReadReply, state);
+ commSetTimeout(conn->fd, Ident::TheConfig.timeout, Ident::Timeout, state);
}
void
#ifndef SQUID_QOSCONFIG_H
#define SQUID_QOSCONFIG_H
- #include "config.h"
#include "hier_code.h"
+#include "ip/forward.h"
#if HAVE_LIBNETFILTER_CONNTRACK_LIBNETFILTER_CONNTRACK_H
#include <libnetfilter_conntrack/libnetfilter_conntrack.h>
#ifndef SQUID_IPC_START_LISTENING_H
#define SQUID_IPC_START_LISTENING_H
- #include "config.h"
+#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "comm/forward.h"
#include "ip/forward.h"
#include "ipc/FdNotes.h"
-#include "base/AsyncCall.h"
#if HAVE_IOSFWD
#include <iosfwd>
* DEBUG: section 54 Interprocess Communication
*
*/
-
-
#include "config.h"
+#include "base/TextException.h"
#include "comm.h"
#include "CommCalls.h"
-#include "base/TextException.h"
+#include "comm/Connection.h"
+ #include "comm/Write.h"
#include "ipc/UdsOp.h"
typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
AsyncCall::Pointer writeHandler = JobCallback(54, 5,
Dialer, this, UdsSender::wrote);
- comm_write(conn(), message.raw(), message.size(), writeHandler);
- Comm::Write(fd(), message.raw(), message.size(), writeHandler, NULL);
++ Comm::Write(conn(), message.raw(), message.size(), writeHandler, NULL);
writing = true;
}
#if USE_LOADABLE_MODULES
#include "LoadableModules.h"
#endif
+#include "Mem.h"
+#include "MemPool.h"
+#include "pconn.h"
+#include "PeerSelectState.h"
+#include "SquidTime.h"
+#include "Store.h"
+#include "StoreFileSystem.h"
+#include "SwapDir.h"
+ #if USE_SSL_CRTD
+ #include "ssl/helper.h"
+ #include "ssl/certificate_db.h"
+ #endif
+
+ #if USE_SSL
+ #include "ssl/context_storage.h"
+ #endif
+
+ #if ICAP_CLIENT
+ #include "adaptation/icap/Config.h"
+ #endif
+ #if USE_ECAP
+ #include "adaptation/ecap/Config.h"
+ #endif
+ #if USE_ADAPTATION
+ #include "adaptation/Config.h"
+ #endif
+ #if USE_SQUID_ESI
+ #include "esi/Module.h"
+ #endif
+ #include "fs/Module.h"
+
#if HAVE_PATHS_H
#include <paths.h>
#endif
/// called when the client socket gets closed by some external force
void
- Mgr::Forwarder::noteCommClosed(const CommCloseCbParams& params)
+ Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &io)
{
debugs(16, 5, HERE);
- Must(fd == io.fd);
- fd = -1;
+ Must(!Comm::IsConnOpen(clientConnection));
mustStop("commClosed");
}
#include "config.h"
#include "base/TextException.h"
+#include "comm.h"
+ #include "comm/Write.h"
#include "CommCalls.h"
+#include "comm/Connection.h"
#include "HttpReply.h"
#include "ipc/Coordinator.h"
#include "mgr/ActionWriter.h"
std::auto_ptr<MemBuf> replyBuf(reply->pack());
writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
- comm_write_mbuf(clientConnection, replyBuf.get(), writer);
- Comm::Write(fd, replyBuf.get(), writer);
++ Comm::Write(clientConnection, replyBuf.get(), writer);
}
/// called when we wrote the response header
#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "CommCalls.h"
+ #include "comm/Write.h"
#include "ipc/FdNotes.h"
#include "mgr/StoreToCommWriter.h"
#include "StoreClient.h"
AsyncCall::Pointer writer =
asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
MyDialer(this, &StoreToCommWriter::noteCommWrote));
- comm_write(clientConnection, ioBuf.data, ioBuf.length, writer);
- Comm::Write(fd, ioBuf.data, ioBuf.length, writer, NULL);
++ Comm::Write(clientConnection, ioBuf.data, ioBuf.length, writer, NULL);
}
void
*/
#include "squid.h"
-#include "errorpage.h"
-#include "HttpRequest.h"
-#include "fde.h"
+#include "acl/FilledChecklist.h"
+#include "Array.h"
#include "comm.h"
+#include "comm/Connection.h"
+#include "comm/ConnOpener.h"
+ #include "comm/Write.h"
+#include "client_side.h"
#include "client_side_request.h"
-#include "acl/FilledChecklist.h"
#if DELAY_POOLS
#include "DelayId.h"
#endif
void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
};
- static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n";
-#define fd_closed(fd) (fd == -1 || fd_table[fd].closing())
-
+ static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
static CNCB tunnelConnectDone;
static ERCB tunnelErrorComplete;
if (len < 0 || errcode)
from.error (xerrno);
- else if (len == 0 || fd_closed(to.fd())) {
- comm_close(from.fd());
- /* Only close the remote end if we've finished queueing data to it */
+ else if (len == 0 || !Comm::IsConnOpen(to.conn)) {
+ from.conn->close();
- if (from.len == 0 && !fd_closed(to.fd()) ) {
- comm_close(to.fd());
+ /* Only close the remote end if we've finished queueing data to it */
+ if (from.len == 0 && Comm::IsConnOpen(to.conn) ) {
+ to.conn->close();
}
- } else if (cbdataReferenceValid(this))
- comm_write(to.conn, from.buf, len, completion, this, NULL);
+ } else if (cbdataReferenceValid(this)) {
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeTunnelWriteHandler",
+ CommIoCbPtrFun(completion, this));
- Comm::Write(to.fd(), from.buf, len, call, NULL);
++ Comm::Write(to.conn, from.buf, len, call, NULL);
+ }
cbdataInternalUnlock(this); /* ??? */
}
TunnelStateData::copyRead(Connection &from, IOCB *completion)
{
assert(from.len == 0);
- comm_read(from.fd(), from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this);
-}
-
-static void
-tunnelConnectTimeout(int fd, void *data)
-{
- TunnelStateData *tunnelState = (TunnelStateData *)data;
- HttpRequest *request = tunnelState->request;
- ErrorState *err = NULL;
-
- if (tunnelState->servers) {
- if (tunnelState->servers->_peer)
- hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
- tunnelState->servers->_peer->host);
- else if (Config.onoff.log_ip_on_direct)
- hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
- fd_table[tunnelState->server.fd()].ipaddr);
- else
- hierarchyNote(&tunnelState->request->hier, tunnelState->servers->code,
- tunnelState->host);
- } else
- debugs(26, 1, "tunnelConnectTimeout(): tunnelState->servers is NULL");
-
- err = errorCon(ERR_CONNECT_FAIL, HTTP_SERVICE_UNAVAILABLE, request);
-
- *tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE;
-
- err->xerrno = ETIMEDOUT;
-
- err->port = tunnelState->port;
-
- err->callback = tunnelErrorComplete;
-
- err->callback_data = tunnelState;
-
- errorSend(tunnelState->client.fd(), err);
- comm_close(fd);
+ comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this);
}
++/**
++ * All the pieces we need to write to client and/or server connection
++ * Have been written. Start the blind pump.
++ */
static void
-tunnelConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
}
}
-/*
- * handle the write completion from a proxy request to an upstream proxy
- */
static void
-tunnelProxyConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
-{
- tunnelConnectedWriteDone(fd, buf, size, flag, xerrno, data);
-}
-
-static void
-tunnelConnected(int fd, void *data)
+tunnelConnected(const Comm::ConnectionPointer &server, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
- debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState);
+ debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState);
*tunnelState->status_ptr = HTTP_OK;
- comm_write(tunnelState->client.conn, conn_established, strlen(conn_established),
- tunnelConnectedWriteDone, tunnelState, NULL);
+ AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
+ CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
- Comm::Write(tunnelState->client.fd(), conn_established, strlen(conn_established), call, NULL);
++ Comm::Write(tunnelState->client.conn, conn_established, strlen(conn_established), call, NULL);
}
static void
}
static void
- tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *data)
-tunnelProxyConnected(int fd, void *data)
++tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
HttpHeader hdr_out(hoRequest);
Packer p;
http_state_flags flags;
- debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState);
- debugs(26, 3, "tunnelProxyConnected: FD " << fd << " tunnelState=" << tunnelState);
++ debugs(26, 3, HERE << srv << ", tunnelState=" << tunnelState);
memset(&flags, '\0', sizeof(flags));
flags.proxying = tunnelState->request->flags.proxying;
MemBuf mb;
packerClean(&p);
mb.append("\r\n", 2);
- comm_write_mbuf(server, &mb, tunnelConnectedWriteDone, tunnelState);
- commSetTimeout(server->fd, Config.Timeout.read, tunnelTimeout, tunnelState);
- AsyncCall::Pointer call = commCbCall(5,5, "tunnelProxyConnectedWriteDone",
- CommIoCbPtrFun(tunnelProxyConnectedWriteDone, tunnelState));
-
- Comm::Write(tunnelState->server.fd(), &mb, call);
- commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState);
++ AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
++ CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
++ Comm::Write(srv, &mb, call);
++ commSetTimeout(srv->fd, Config.Timeout.read, tunnelTimeout, tunnelState);
}
static void
String str_print=p->request->urlpath.substr(1,p->request->urlpath.size());
snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print));
- comm_write(fwd->serverConnection(), buf, strlen(buf), whoisWriteComplete, p, NULL);
+ AsyncCall::Pointer call = commCbCall(5,5, "whoisWriteComplete",
+ CommIoCbPtrFun(whoisWriteComplete, p));
+
- Comm::Write(fd, buf, strlen(buf), call, NULL);
- comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p);
- commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p);
++ Comm::Write(fwd->serverConnection(), buf, strlen(buf), call, NULL);
+ comm_read(fwd->serverConnection(), p->buf, BUFSIZ, whoisReadReply, p);
+ commSetTimeout(fwd->serverConnection()->fd, Config.Timeout.read, whoisTimeout, p);
}
/* PRIVATE */