clientdbEstablished(details->remote, 1);
incoming_sockets_accepted++;
+
+ // TODO: remove this when details conn is passed around properly.
+ details->fd = -1; // ConnStateData has assumed control of the FD now.
}
#if USE_SSL
clientdbEstablished(details->remote, 1);
incoming_sockets_accepted++;
+
+ // TODO: remove this when details conn is passed around properly.
+ details->fd = -1; // ConnStateData has assumed control of the FD now.
}
bool
Must(s);
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
- CommAcceptCbPtrFun(httpAccept, s));
-
- s->listener = new Comm::ListenStateData(fd, call, true);
+ s->listener = new Comm::ListenStateData(fd, true);
+ s->listener->subscribe(5,5, "httpAccept", new CommAcceptCbPtrFun(httpAccept, s));
debugs(1, 1, "Accepting " <<
(s->intercepted ? " intercepted" : "") <<
Must(s);
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
- CommAcceptCbPtrFun(httpsAccept, s));
-
- s->listener = new Comm::ListenStateData(fd, call, true);
+ s->listener = new Comm::ListenStateData(fd, true);
+ s->listener->subscribe(5,5, "httpsAccept", new CommAcceptCbPtrFun(httpsAccept, s));
debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
#include "protos.h"
#include "SquidTime.h"
-/**
- * New-style listen and accept routines
- *
- * setListen simply registers our interest in an FD for listening.
- * The constructor takes a callback to call when an FD has been
- * accept()ed some time later.
- */
-void
-Comm::ListenStateData::setListen()
-{
- errcode = 0; // reset local errno copy.
- if (listen(fd, Squid_MaxFD >> 2) < 0) {
- debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
- errcode = errno;
- return;
- }
-
- if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
-#ifdef SO_ACCEPTFILTER
- struct accept_filter_arg afa;
- bzero(&afa, sizeof(afa));
- debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
- xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
- if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
- debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
-#elif defined(TCP_DEFER_ACCEPT)
- int seconds = 30;
- if (strncmp(Config.accept_filter, "data=", 5) == 0)
- seconds = atoi(Config.accept_filter + 5);
- if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
- debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
-#else
- debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
-#endif
- }
-}
-
+// TODO remove.
Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
fd(aFd),
errcode(0),
commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
}
+// TODO remove.
Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note) :
errcode(0),
isLimited(0),
commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
}
+Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
+ fd(aFd),
+ errcode(0),
+ isLimited(0),
+ callSection(NULL),
+ callLevel(NULL),
+ callName(NULL),
+ callDialer(NULL),
+ mayAcceptMore(accept_many)
+{
+ assert(aFd >= 0);
+ assert(isOpen(aFd));
+}
+
+Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note) :
+ errcode(0),
+ isLimited(0),
+ callSection(NULL),
+ callLevel(NULL),
+ callName(NULL),
+ callDialer(NULL),
+ mayAcceptMore(accept_many)
+{
+ /* open the conn if its not already open */
+ if (!IsConnOpen(conn)) {
+ conn->fd = comm_open(SOCK_STREAM,
+ IPPROTO_TCP,
+ conn->local,
+ conn->flags,
+ note);
+ debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd );
+
+ if (!conn->isOpen()) {
+ debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
+ errcode = -1;
+ return;
+ }
+ }
+
+ assert(IsConnOpen(conn));
+ fd = conn->fd;
+}
+
Comm::ListenStateData::~ListenStateData()
{
comm_close(fd);
fd = -1;
+ delete callDialer;;
+}
+
+void
+Comm::ListenStateData::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
+{
+ debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name);
+
+ // if this is the first subscription. start listening on the socket.
+ if (callDialer == NULL)
+ setListen();
+
+ // store the subscribed handler details.
+ callSection = section;
+ callLevel = level;
+ safe_free(callName);
+ callName = xstrdup(name);
+ callDialer = dialer;
+
+ // if no error so far start accepting connections.
+ if (errcode == 0)
+ commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+void
+Comm::ListenStateData::unsubscribe()
+{
+ safe_free(callName);
+ delete callDialer;
+ callDialer = NULL;
+}
+
+/**
+ * New-style listen and accept routines
+ *
+ * setListen simply registers our interest in an FD for listening.
+ * The constructor takes a callback to call when an FD has been
+ * accept()ed some time later.
+ */
+void
+Comm::ListenStateData::setListen()
+{
+ errcode = 0; // reset local errno copy.
+ if (listen(fd, Squid_MaxFD >> 2) < 0) {
+ debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+ errcode = errno;
+ return;
+ }
+
+ if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
+#ifdef SO_ACCEPTFILTER
+ struct accept_filter_arg afa;
+ bzero(&afa, sizeof(afa));
+ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
+ xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
+ if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
+ debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+#elif defined(TCP_DEFER_ACCEPT)
+ int seconds = 30;
+ if (strncmp(Config.accept_filter, "data=", 5) == 0)
+ seconds = atoi(Config.accept_filter + 5);
+ if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
+ debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+#else
+ debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+#endif
+ }
}
/**
if (newfd == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+ debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << callName);
commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
return;
}
// A non-recoverable error; notify the caller */
- debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
+ debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << callName);
notify(-1, COMM_ERROR, connDetails);
mayAcceptMore = false;
return;
debugs(5, 5, HERE << "accepted: FD " << fd <<
" newfd: " << newfd << " from: " << connDetails->remote <<
- " handler: " << theCallback);
+ " handler: " << callName);
notify(newfd, COMM_OK, connDetails);
}
return;
}
- if (theCallback != NULL) {
+ if (callDialer != NULL) {
+ AsyncCall::Pointer call = commCbCall(callSection, callLevel, callName, *callDialer);
+ typedef CommAcceptCbParams Params;
+ Params ¶ms = GetCommParams<Params>(call);
+ params.fd = fd;
+ params.nfd = newfd;
+ params.details = connDetails;
+ params.flag = flag;
+ params.xerrno = errcode;
+ ScheduleCallHere(call);
+ if (!mayAcceptMore)
+ unsubscribe();
+ }
+ else if (theCallback != NULL) {
typedef CommAcceptCbParams Params;
Params ¶ms = GetCommParams<Params>(theCallback);
params.fd = fd;
#define SQUID_LISTENERSTATEDATA_H
#include "config.h"
-#include "base/AsyncCall.h"
+#include "CommCalls.h"
#include "comm/comm_err_t.h"
#include "comm/forward.h"
{
public:
+// old remove ASAP when subscribe is working.
ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); // Legacy
ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note);
+
+ ListenStateData(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
+ ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
ListenStateData(const ListenStateData &r); // not implemented.
~ListenStateData();
+ // legacy. removing ASAP when below version works.
void subscribe(AsyncCall::Pointer &call) { theCallback = call; };
+
+ /** Subscribe a handler to receive calls back about new connections.
+ * Replaces any existing subscribed handler.
+ */
+ void subscribe(int level, int section, const char *name, CommAcceptCbPtrFun *dialer);
+// void subscribe(int level, int section, const char *name, CommAcceptMemFun *dialer);
+
+ /** Remove the currently waiting callback subscription.
+ * Pending calls will remain scheduled.
+ */
+ void unsubscribe();
+
+ /** Try and accept another connection.
+ * If any are pending it will be passed asynchronously to the subscribed callback.
+ */
void acceptNext();
+
+ /// Call the subscribed callback handler with details about a new connection.
void notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &details);
+ /// socket being listened on for new connections
int fd;
/// errno code of the last accept() or listen() action if one occurred.
/// whether this socket is delayed and on the AcceptLimiter queue.
int32_t isLimited;
+private:
+ int callSection; ///< debug section for subscribed callback.
+ int callLevel; ///< debug level for subscribed callback.
+ char *callName; ///< Name for the subscribed callback.
+ CommAcceptCbPtrFun *callDialer; ///< dialer to make the subscribed callback
+
private:
/// Method to test if there are enough file descriptors to open a new client connection
/// if not the accept() will be postponed