#include "base/TextException.h"
#include "CommCalls.h"
#include "comm/AcceptLimiter.h"
-#include "comm/Connection.h"
#include "comm/comm_internal.h"
+#include "comm/Connection.h"
#include "comm/ConnAcceptor.h"
#include "fde.h"
#include "protos.h"
CBDATA_CLASS_INIT(ConnAcceptor);
};
-Comm::ConnAcceptor::ConnAcceptor(int aFd, bool accept_many) :
- AsyncJob("Legacy_Comm::ConnAcceptor"),
- errcode(0),
- isLimited(0),
- callSection(NULL),
- callLevel(NULL),
- callName(NULL),
- callDialer(NULL),
- theCallback(NULL),
- mayAcceptMore(accept_many)
-{
- assert(aFd >= 0);
- assert(isOpen(aFd));
- conn = new Connection;
- conn->fd = aFd;
- // TODO: figure out what the new FD local address is/was/should be.
-}
-
-Comm::ConnAcceptor::ConnAcceptor(Comm::ConnectionPointer &newConn, bool accept_many, const char *note) :
+Comm::ConnAcceptor::ConnAcceptor(const Comm::ConnectionPointer &newConn, const char *note, const Subscription::Pointer &aSub) :
AsyncJob("Comm::ConnAcceptor"),
errcode(0),
isLimited(0),
- callSection(NULL),
- callLevel(NULL),
- callName(NULL),
- callDialer(NULL),
- theCallback(NULL),
- mayAcceptMore(accept_many)
+ theCallSub(aSub),
+ conn(newConn)
{
+ assert(newConn != NULL);
+
/* open the conn if its not already open */
- if (!IsConnOpen(newConn)) {
- newConn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note);
- debugs(9, 3, HERE << "Unconnected data socket created on FD " << newConn->fd );
+ if (!IsConnOpen(conn)) {
+ conn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note);
+ errcode = errno;
- if (!newConn->isOpen()) {
- debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
- errcode = -1;
+ if (!conn->isOpen()) {
+ debugs(5, DBG_CRITICAL, HERE << "comm_open failed: " << conn << " error: " << errcode);
+ conn = NULL;
return;
}
+ debugs(9, 3, HERE << "Unconnected data socket created on " << conn);
}
-
assert(IsConnOpen(newConn));
- conn = newConn;
-}
-
-Comm::ConnAcceptor::~ConnAcceptor()
-{
- swanSong();
}
void
-Comm::ConnAcceptor::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
+Comm::ConnAcceptor::subscribe(const Subscription::Pointer &aSub)
{
- debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << name);
-
- // if this is the first subscription. start listening on the socket.
- if (callDialer == NULL && theCallback == NULL)
- setListen();
-
- // remove old subscription. if any.
- unsubscribe();
-
- // store the subscribed handler details.
- callSection = section;
- callLevel = level;
- safe_free(callName);
- callName = xstrdup(name);
- callDialer = dialer;
-}
-
-void
-Comm::ConnAcceptor::subscribe(const AsyncCall::Pointer &call)
-{
- debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << call);
-
- // remove old subscription. if any.
- unsubscribe();
-
- // store new callback subscription
- theCallback = call;
+ debugs(5, 5, HERE << conn << " AsyncCall Subscription: " << aSub);
+ unsubscribe("subscription change");
+ theCallSub = aSub;
}
void
-Comm::ConnAcceptor::unsubscribe()
+Comm::ConnAcceptor::unsubscribe(const char *reason)
{
- safe_free(callName);
- delete callDialer;
- callDialer = NULL;
- theCallback = NULL;
+ debugs(5, 5, HERE << conn << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+ theCallSub = NULL;
}
void
Comm::ConnAcceptor::start()
{
- debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << callName);
+ debugs(5, 5, HERE << conn << " AsyncCall: " << theCallSub);
Must(IsConnOpen(conn));
bool
Comm::ConnAcceptor::doneAll() const
{
+ // stio when FD is closed
if (!IsConnOpen(conn)) {
- debugs(5,5, HERE << "Done? maybe. FD is closed." << (conn==NULL?"conn=NULL":"") << ", FD " << (conn!=NULL?conn->fd:-999));
return AsyncJob::doneAll();
}
- if (callDialer == NULL && theCallback == NULL) {
- debugs(5,5, HERE << "Done? maybe: handlers are gone.");
+ // stop when handlers are gone
+ if (theCallSub == NULL) {
return AsyncJob::doneAll();
}
+ // open FD with handlers...keep accepting.
return false;
}
Comm::ConnAcceptor::swanSong()
{
debugs(5,5, HERE);
- unsubscribe();
+ unsubscribe("swanSong");
conn = NULL;
+ AcceptLimiter::Instance().removeDead(this);
AsyncJob::swanSong();
}
{
errcode = 0; // reset local errno copy.
if (listen(conn->fd, Squid_MaxFD >> 2) < 0) {
- debugs(50, 0, HERE << "listen(FD " << conn->fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+ debugs(50, DBG_CRITICAL, "ERROR: listen(" << conn << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
errcode = errno;
return;
}
#ifdef SO_ACCEPTFILTER
struct accept_filter_arg afa;
bzero(&afa, sizeof(afa));
- debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << conn->fd);
+ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on " << conn);
xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
- debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+ debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
#elif defined(TCP_DEFER_ACCEPT)
int seconds = 30;
if (strncmp(Config.accept_filter, "data=", 5) == 0)
seconds = atoi(Config.accept_filter + 5);
if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
- debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+ debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
#else
- debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+ debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
#endif
}
}
void
Comm::ConnAcceptor::doAccept(int fd, void *data)
{
- debugs(5, 2, HERE << "New connection on FD " << fd);
+ try {
+ debugs(5, 2, HERE << "New connection on FD " << fd);
- Must(isOpen(fd));
- ConnAcceptor *afd = static_cast<ConnAcceptor*>(data);
+ Must(isOpen(fd));
+ ConnAcceptor *afd = static_cast<ConnAcceptor*>(data);
- if (!okToAccept()) {
- AcceptLimiter::Instance().defer(afd);
- } else {
- afd->acceptNext();
+ if (!okToAccept()) {
+ AcceptLimiter::Instance().defer(afd);
+ } else {
+ afd->acceptNext();
+ }
+ commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0);
+
+ } catch(TextException &e) {
+ fatalf("FATAL: error while accepting new client connection: %s\n", e.message);
+ } catch(...) {
+ fatal("FATAL: error while accepting new client connection: [unkown]\n");
}
- commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0);
}
bool
*/
/* Accept a new connection */
- Connection *newConnDetails = new Connection();
- int newfd = oldAccept(*newConnDetails);
+ ConnectionPointer newConnDetails = new Connection();
+ comm_err_t status = oldAccept(newConnDetails);
/* Check for errors */
- if (newfd < 0) {
+ if (!newConnDetails->isOpen()) {
- if (newfd == COMM_NOMESSAGE) {
+ if (status == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, HERE << "try later: FD " << conn->fd << " handler: " << callName);
+ debugs(5, 5, HERE << "try later: " << conn << " handler: " << theCallSub);
commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
return;
}
// A non-recoverable error; notify the caller */
- debugs(5, 5, HERE << "non-recoverable error: FD " << conn->fd << " handler: " << callName);
- notify(-1, COMM_ERROR, newConnDetails);
- mayAcceptMore = false;
+ debugs(5, 5, HERE << "non-recoverable error: " << conn << " handler: " << theCallSub);
+ notify(status, newConnDetails);
+ mustStop("Listener socket closed");
return;
}
- debugs(5, 5, HERE << "accepted: FD " << conn->fd <<
- " newfd: " << newfd << " from: " << newConnDetails->remote <<
- " handler: " << callName);
- notify(newfd, COMM_OK, newConnDetails);
+ debugs(5, 5, HERE << "Listener: " << conn <<
+ " accepted new connection " << newConnDetails <<
+ " handler: " << theCallSub);
+ notify(status, newConnDetails);
}
void
Comm::ConnAcceptor::acceptNext()
{
Must(IsConnOpen(conn));
- debugs(5, 2, HERE << "connection on FD " << conn->fd);
+ debugs(5, 2, HERE << "connection on " << conn);
acceptOne();
}
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
void
-Comm::ConnAcceptor::notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &newConnDetails)
+Comm::ConnAcceptor::notify(comm_err_t flag, const Comm::ConnectionPointer &newConnDetails)
{
// listener socket handlers just abandon the port with COMM_ERR_CLOSING
// it should only happen when this object is deleted...
return;
}
- if (callDialer != NULL) {
- AsyncCall::Pointer call = commCbCall(callSection, callLevel, callName, *callDialer);
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(call);
+ if (theCallSub != NULL) {
+ AsyncCall::Pointer call = theCallSub->callback();
+ CommAcceptCbParams ¶ms = GetCommParams<CommAcceptCbParams>(call);
params.fd = conn->fd;
- params.nfd = newfd;
- params.details = newConnDetails;
+ params.conn = newConnDetails;
params.flag = flag;
params.xerrno = errcode;
ScheduleCallHere(call);
}
- else if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
- params.fd = conn->fd;
- params.nfd = newfd;
- params.details = newConnDetails;
- params.flag = flag;
- params.xerrno = errcode;
- ScheduleCallHere(theCallback);
- // only permit the call to be scheduled once.
- mayAcceptMore = false;
- theCallback = NULL;
- }
}
/**
* accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK success. details parameter filled.
+ * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR an outright failure occured.
+ * Or if this client has too many connections already.
*/
-int
-Comm::ConnAcceptor::oldAccept(Comm::Connection &details)
+comm_err_t
+Comm::ConnAcceptor::oldAccept(Comm::ConnectionPointer &details)
{
PROF_start(comm_accept);
statCounter.syscalls.sock.accepts++;
int sock;
struct addrinfo *gai = NULL;
- details.local.InitAddrInfo(gai);
+ details->local.InitAddrInfo(gai);
errcode = 0; // reset local errno copy.
if ((sock = accept(conn->fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
errcode = errno; // store last accept errno locally.
- details.local.FreeAddrInfo(gai);
+ details->local.FreeAddrInfo(gai);
PROF_stop(comm_accept);
if (ignoreErrno(errno)) {
- debugs(50, 5, HERE << "FD " << conn->fd << ": " << xstrerror());
+ debugs(50, 5, HERE << conn << ": " << xstrerror());
return COMM_NOMESSAGE;
} else if (ENFILE == errno || EMFILE == errno) {
- debugs(50, 3, HERE << "FD " << conn->fd << ": " << xstrerror());
+ debugs(50, 3, HERE << conn << ": " << xstrerror());
return COMM_ERROR;
} else {
- debugs(50, 1, HERE << "FD " << conn->fd << ": " << xstrerror());
+ debugs(50, 1, HERE << conn << ": " << xstrerror());
return COMM_ERROR;
}
}
Must(sock >= 0);
- details.fd = sock;
- details.remote = *gai;
+ details->fd = sock;
+ details->remote = *gai;
if ( Config.client_ip_max_connections >= 0) {
- if (clientdbEstablished(details.remote, 0) > Config.client_ip_max_connections) {
- debugs(50, DBG_IMPORTANT, "WARNING: " << details.remote << " attempting more than " << Config.client_ip_max_connections << " connections.");
- details.local.FreeAddrInfo(gai);
+ if (clientdbEstablished(details->remote, 0) > Config.client_ip_max_connections) {
+ debugs(50, DBG_IMPORTANT, "WARNING: " << details->remote << " attempting more than " << Config.client_ip_max_connections << " connections.");
+ details->local.FreeAddrInfo(gai);
return COMM_ERROR;
}
}
// lookup the local-end details of this new connection
- details.local.InitAddrInfo(gai);
- details.local.SetEmpty();
+ details->local.InitAddrInfo(gai);
+ details->local.SetEmpty();
getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
- details.local = *gai;
- details.local.FreeAddrInfo(gai);
+ details->local = *gai;
+ details->local.FreeAddrInfo(gai);
/* fdstat update */
- // XXX : these are not all HTTP requests. use a note about type and ip:port details.
+ // XXX : these are not all HTTP requests. use a note about type and ip:port details->
// so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
fd_open(sock, FD_SOCKET, "HTTP Request");
fdd_table[sock].close_line = 0;
fde *F = &fd_table[sock];
- details.remote.NtoA(F->ipaddr,MAX_IPSTRLEN);
- F->remote_port = details.remote.GetPort();
- F->local_addr = details.local;
- F->sock_family = details.local.IsIPv6()?AF_INET6:AF_INET;
+ details->remote.NtoA(F->ipaddr,MAX_IPSTRLEN);
+ F->remote_port = details->remote.GetPort();
+ F->local_addr = details->local;
+ F->sock_family = details->local.IsIPv6()?AF_INET6:AF_INET;
// set socket flags
commSetCloseOnExec(sock);
F->flags.transparent = fd_table[conn->fd].flags.transparent;
PROF_stop(comm_accept);
- return sock;
+ return COMM_OK;
}
#define SQUID_COMM_CONNACCEPTOR_H
#include "config.h"
+#include "base/Subscription.h"
#include "CommCalls.h"
#include "comm/comm_err_t.h"
#include "comm/forward.h"
namespace Comm
{
+class AcceptLimiter;
+
+/**
+ * Listens on a Comm::Connection for new incoming connections and
+ * emits an active Comm::Connection descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
class ConnAcceptor : public AsyncJob
{
private:
- void start();
- bool doneAll() const;
- void swanSong();
+ virtual void start();
+ virtual bool doneAll() const;
+ virtual void swanSong();
public:
- ConnAcceptor(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
- ConnAcceptor(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
+ ConnAcceptor(const Comm::ConnectionPointer &conn, const char *note, const Subscription::Pointer &aSub);
ConnAcceptor(const ConnAcceptor &r); // not implemented.
- ~ConnAcceptor();
/** Subscribe a handler to receive calls back about new connections.
* Replaces any existing subscribed handler.
*/
- void subscribe(int level, int section, const char *name, CommAcceptCbPtrFun *dialer);
-
- /** Subscribe a handler to receive calls back about new connections.
- * Replaces any existing subscribed handler.
- * Due to not being able to re-use calls, only permits one to be received.
- */
- void subscribe(const AsyncCall::Pointer &call);
+ void subscribe(const Subscription::Pointer &aSub);
/** Remove the currently waiting callback subscription.
* Pending calls will remain scheduled.
*/
- void unsubscribe();
+ void unsubscribe(const char *reason);
/** Try and accept another connection (synchronous).
* If one is pending already the subscribed callback handler will be scheduled
void acceptNext();
/// Call the subscribed callback handler with details about a new connection.
- void notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &details);
-
- /// conn being listened on for new connections
- /// Reserved for read-only use.
- ConnectionPointer conn;
+ void notify(comm_err_t flag, const Comm::ConnectionPointer &details);
/// errno code of the last accept() or listen() action if one occurred.
int errcode;
- /// whether this socket is delayed and on the AcceptLimiter queue.
- /// Reserved for read-only use outside of AcceptLimiter
- int32_t isLimited;
-
private:
- int callSection; ///< debug section for subscribed callback.
- int callLevel; ///< debug level for subscribed callback.
- char *callName; ///< Name for the subscribed callback.
- CommAcceptCbPtrFun *callDialer; ///< dialer to make the subscribed callback
+ friend class AcceptLimiter;
+ int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue.
+ Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events.
- AsyncCall::Pointer theCallback; // TODO remove legacy pointer. Store dialer of members instead.
+ /// conn being listened on for new connections
+ /// Reserved for read-only use.
+ ConnectionPointer conn;
private:
/// Method to test if there are enough file descriptors to open a new client connection
static void doAccept(int fd, void *data);
void acceptOne();
- int oldAccept(Comm::Connection &details);
-
- bool mayAcceptMore;
-
+ comm_err_t oldAccept(Comm::ConnectionPointer &details);
void setListen();
CBDATA_CLASS2(ConnAcceptor);
typedef CommCbMemFunT<Comm::ConnOpener, CommTimeoutCbParams> Dialer;
calls_.timeout_ = asyncCall(5, 4, "Comm::ConnOpener::timeout",
Dialer(this, &Comm::ConnOpener::timeout));
- debugs(5, 3, HERE << "FD " << conn_->fd << " timeout " << connectTimeout_);
+ debugs(5, 3, HERE << conn_ << " timeout " << connectTimeout_);
commSetTimeout(conn_->fd, connectTimeout_, calls_.timeout_);
}
case COMM_INPROGRESS:
// check for timeout FIRST.
if(squid_curtime - connStart_ > connectTimeout_) {
- debugs(5, 5, HERE << "FD " << conn_->fd << ": * - ERR took too long already.");
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
doneConnecting(COMM_TIMEOUT, errno);
return;
} else {
- debugs(5, 5, HERE << "FD " << conn_->fd << ": COMM_INPROGRESS");
+ debugs(5, 5, HERE << conn_ << ": COMM_INPROGRESS");
commSetSelect(conn_->fd, COMM_SELECT_WRITE, Comm::ConnOpener::ConnectRetry, this, 0);
}
break;
case COMM_OK:
- debugs(5, 5, HERE << "FD " << conn_->fd << ": COMM_OK - connected");
+ debugs(5, 5, HERE << conn_ << ": COMM_OK - connected");
/*
* stats.conn_open is used to account for the number of
break;
default:
- debugs(5, 5, HERE "FD " << conn_->fd << ": * - try again");
+ debugs(5, 5, HERE << conn_ << ": * - try again");
failRetries_++;
if (host_ != NULL)
ipcacheMarkBadAddr(host_, conn_->remote);
// check for timeout FIRST.
if(squid_curtime - connStart_ > connectTimeout_) {
- debugs(5, 5, HERE << "FD " << conn_->fd << ": * - ERR took too long already.");
+ debugs(5, 5, HERE << conn_ << ": * - ERR took too long already.");
doneConnecting(COMM_TIMEOUT, errno);
} else if (failRetries_ < Config.connect_retries) {
ScheduleCallHere(calls_.connect_);
} else {
// send ERROR back to the upper layer.
- debugs(5, 5, HERE << "FD " << conn_->fd << ": * - ERR tried too many times already.");
+ debugs(5, 5, HERE << conn_ << ": * - ERR tried too many times already.");
doneConnecting(COMM_ERR_CONNECT, errno);
}
}
conn_->local.InitAddrInfo(addr);
if (getsockname(conn_->fd, addr->ai_addr, &(addr->ai_addrlen)) != 0) {
- debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: FD " << conn_->fd << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "ERROR: Failed to retrieve TCP/UDP details for socket: " << conn_ << ": " << xstrerror());
conn_->local.FreeAddrInfo(addr);
return;
}
conn_->local = *addr;
conn_->local.FreeAddrInfo(addr);
- debugs(5, 6, HERE << "FD " << conn_->fd << ": conn.local=" << conn_->local);
+ debugs(5, 6, HERE << conn_);
}
/** Abort connection attempt.
void
Comm::ConnOpener::earlyAbort(const CommConnectCbParams &io)
{
- debugs(5, 3, HERE << "FD " << io.conn->fd);
+ debugs(5, 3, HERE << io.conn);
doneConnecting(COMM_ERR_CLOSING, io.xerrno); // NP: is closing or shutdown better?
}