*/
#include "squid.h"
+#include "base/TextException.h"
#include "CommCalls.h"
#include "comm/AcceptLimiter.h"
#include "comm/Connection.h"
#include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
#include "fde.h"
#include "protos.h"
#include "SquidTime.h"
-Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
- fd(aFd),
+namespace Comm {
+ CBDATA_CLASS_INIT(ConnAcceptor);
+};
+
+Comm::ConnAcceptor::ConnAcceptor(int aFd, bool accept_many) :
+ AsyncJob("Legacy_Comm::ConnAcceptor"),
errcode(0),
isLimited(0),
callSection(NULL),
{
assert(aFd >= 0);
assert(isOpen(aFd));
+ conn = new Connection;
+ conn->fd = aFd;
+ // TODO: figure out what the new FD local address is/was/should be.
}
-Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note) :
+Comm::ConnAcceptor::ConnAcceptor(Comm::ConnectionPointer &newConn, bool accept_many, const char *note) :
+ AsyncJob("Comm::ConnAcceptor"),
errcode(0),
isLimited(0),
callSection(NULL),
mayAcceptMore(accept_many)
{
/* open the conn if its not already open */
- if (!IsConnOpen(conn)) {
- conn->fd = comm_open(SOCK_STREAM,
- IPPROTO_TCP,
- conn->local,
- conn->flags,
- note);
- debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd );
-
- if (!conn->isOpen()) {
+ if (!IsConnOpen(newConn)) {
+ newConn->fd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn->local, conn->flags, note);
+ debugs(9, 3, HERE << "Unconnected data socket created on FD " << newConn->fd );
+
+ if (!newConn->isOpen()) {
debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
errcode = -1;
return;
}
}
- assert(IsConnOpen(conn));
- fd = conn->fd;
+ assert(IsConnOpen(newConn));
+ conn = newConn;
}
-Comm::ListenStateData::~ListenStateData()
+Comm::ConnAcceptor::~ConnAcceptor()
{
- unsubscribe();
- comm_close(fd);
- fd = -1;
+ swanSong();
}
void
-Comm::ListenStateData::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
+Comm::ConnAcceptor::subscribe(int section, int level, const char *name, CommAcceptCbPtrFun *dialer)
{
- debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name);
+ debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << name);
// if this is the first subscription. start listening on the socket.
if (callDialer == NULL && theCallback == NULL)
safe_free(callName);
callName = xstrdup(name);
callDialer = dialer;
-
- // if no error so far start accepting connections.
- if (errcode == 0)
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
}
void
-Comm::ListenStateData::subscribe(const AsyncCall::Pointer &call)
+Comm::ConnAcceptor::subscribe(const AsyncCall::Pointer &call)
{
- debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
+ debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << call);
// remove old subscription. if any.
unsubscribe();
// store new callback subscription
theCallback = call;
-
- // start listening on the socket.
- if (theCallback != NULL) {
- setListen();
- // if no error so far start accepting connections.
- if (errcode == 0)
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
- }
}
void
-Comm::ListenStateData::unsubscribe()
+Comm::ConnAcceptor::unsubscribe()
{
safe_free(callName);
delete callDialer;
theCallback = NULL;
}
+void
+Comm::ConnAcceptor::start()
+{
+ debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << callName);
+
+ Must(IsConnOpen(conn));
+
+ setListen();
+
+ // if no error so far start accepting connections.
+ if (errcode == 0)
+ commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::ConnAcceptor::doneAll() const
+{
+ if (!IsConnOpen(conn)) {
+ debugs(5,5, HERE << "Done? maybe. FD is closed." << (conn==NULL?"conn=NULL":"") << ", FD " << (conn!=NULL?conn->fd:-999));
+ return AsyncJob::doneAll();
+ }
+
+ if (callDialer == NULL && theCallback == NULL) {
+ debugs(5,5, HERE << "Done? maybe: handlers are gone.");
+ return AsyncJob::doneAll();
+ }
+
+ return false;
+}
+
+void
+Comm::ConnAcceptor::swanSong()
+{
+ debugs(5,5, HERE);
+ unsubscribe();
+ conn = NULL;
+ AsyncJob::swanSong();
+}
+
/**
* New-style listen and accept routines
*
* accept()ed some time later.
*/
void
-Comm::ListenStateData::setListen()
+Comm::ConnAcceptor::setListen()
{
errcode = 0; // reset local errno copy.
- if (listen(fd, Squid_MaxFD >> 2) < 0) {
- debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+ if (listen(conn->fd, Squid_MaxFD >> 2) < 0) {
+ debugs(50, 0, HERE << "listen(FD " << conn->fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
errcode = errno;
return;
}
#ifdef SO_ACCEPTFILTER
struct accept_filter_arg afa;
bzero(&afa, sizeof(afa));
- debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
+ debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << conn->fd);
xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
- if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
+ if (setsockopt(conn->fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
#elif defined(TCP_DEFER_ACCEPT)
int seconds = 30;
if (strncmp(Config.accept_filter, "data=", 5) == 0)
seconds = atoi(Config.accept_filter + 5);
- if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
+ if (setsockopt(conn->fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
#else
debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
* done later when enough sockets become available.
*/
void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::ConnAcceptor::doAccept(int fd, void *data)
{
debugs(5, 2, HERE << "New connection on FD " << fd);
- assert(isOpen(fd));
- ListenStateData *afd = static_cast<ListenStateData*>(data);
+ Must(isOpen(fd));
+ ConnAcceptor *afd = static_cast<ConnAcceptor*>(data);
if (!okToAccept()) {
AcceptLimiter::Instance().defer(afd);
} else {
afd->acceptNext();
}
- commSetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
+ commSetSelect(fd, COMM_SELECT_READ, Comm::ConnAcceptor::doAccept, afd, 0);
}
bool
-Comm::ListenStateData::okToAccept()
+Comm::ConnAcceptor::okToAccept()
{
static time_t last_warn = 0;
}
void
-Comm::ListenStateData::acceptOne()
+Comm::ConnAcceptor::acceptOne()
{
/*
* We don't worry about running low on FDs here. Instead,
*/
/* Accept a new connection */
- Connection *connDetails = new Connection();
- int newfd = oldAccept(*connDetails);
+ Connection *newConnDetails = new Connection();
+ int newfd = oldAccept(*newConnDetails);
/* Check for errors */
if (newfd < 0) {
if (newfd == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << callName);
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+ debugs(5, 5, HERE << "try later: FD " << conn->fd << " handler: " << callName);
+ commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
return;
}
// A non-recoverable error; notify the caller */
- debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << callName);
- notify(-1, COMM_ERROR, connDetails);
+ debugs(5, 5, HERE << "non-recoverable error: FD " << conn->fd << " handler: " << callName);
+ notify(-1, COMM_ERROR, newConnDetails);
mayAcceptMore = false;
return;
}
- debugs(5, 5, HERE << "accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails->remote <<
+ debugs(5, 5, HERE << "accepted: FD " << conn->fd <<
+ " newfd: " << newfd << " from: " << newConnDetails->remote <<
" handler: " << callName);
- notify(newfd, COMM_OK, connDetails);
+ notify(newfd, COMM_OK, newConnDetails);
}
void
-Comm::ListenStateData::acceptNext()
+Comm::ConnAcceptor::acceptNext()
{
- assert(isOpen(fd));
- debugs(5, 2, HERE << "connection on FD " << fd);
+ Must(IsConnOpen(conn));
+ debugs(5, 2, HERE << "connection on FD " << conn->fd);
acceptOne();
}
void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &connDetails)
+Comm::ConnAcceptor::notify(int newfd, comm_err_t flag, const Comm::ConnectionPointer &newConnDetails)
{
// listener socket handlers just abandon the port with COMM_ERR_CLOSING
// it should only happen when this object is deleted...
AsyncCall::Pointer call = commCbCall(callSection, callLevel, callName, *callDialer);
typedef CommAcceptCbParams Params;
Params ¶ms = GetCommParams<Params>(call);
- params.fd = fd;
+ params.fd = conn->fd;
params.nfd = newfd;
- params.details = connDetails;
+ params.details = newConnDetails;
params.flag = flag;
params.xerrno = errcode;
ScheduleCallHere(call);
- if (!mayAcceptMore)
- unsubscribe();
}
else if (theCallback != NULL) {
typedef CommAcceptCbParams Params;
Params ¶ms = GetCommParams<Params>(theCallback);
- params.fd = fd;
+ params.fd = conn->fd;
params.nfd = newfd;
- params.details = connDetails;
+ params.details = newConnDetails;
params.flag = flag;
params.xerrno = errcode;
ScheduleCallHere(theCallback);
* Wait for an incoming connection on FD.
*/
int
-Comm::ListenStateData::oldAccept(Comm::Connection &details)
+Comm::ConnAcceptor::oldAccept(Comm::Connection &details)
{
PROF_start(comm_accept);
statCounter.syscalls.sock.accepts++;
details.local.InitAddrInfo(gai);
errcode = 0; // reset local errno copy.
- if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
+ if ((sock = accept(conn->fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
errcode = errno; // store last accept errno locally.
details.local.FreeAddrInfo(gai);
PROF_stop(comm_accept);
if (ignoreErrno(errno)) {
- debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 5, HERE << "FD " << conn->fd << ": " << xstrerror());
return COMM_NOMESSAGE;
} else if (ENFILE == errno || EMFILE == errno) {
- debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 3, HERE << "FD " << conn->fd << ": " << xstrerror());
return COMM_ERROR;
} else {
- debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 1, HERE << "FD " << conn->fd << ": " << xstrerror());
return COMM_ERROR;
}
}
- assert(sock >= 0);
+ Must(sock >= 0);
details.fd = sock;
details.remote = *gai;
commSetNonBlocking(sock);
/* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
- F->flags.transparent = fd_table[fd].flags.transparent;
+ F->flags.transparent = fd_table[conn->fd].flags.transparent;
PROF_stop(comm_accept);
return sock;
#include "squid.h"
#include "comm.h"
#include "comm/ConnOpener.h"
-#include "comm/ListenStateData.h"
+#include "comm/ConnAcceptor.h"
#include "compat/strtoll.h"
#include "errorpage.h"
#include "fde.h"
* FTP stores a copy of the FD in the channel descriptor.
* Use close() to properly close the channel.
*/
- Comm::ListenStateData *listener;
+ Comm::ConnAcceptor *listener;
/** A temporary handle to the connection being listened on.
* Data channel listeners die before handing the result conn over. If the conn is
FtpStateData::dataClosed(const CommCloseCbParams &io)
{
if (data.listener) {
- data.listener->unsubscribe();
- delete data.listener;
+ data.listener->unsubscribe(); // listener job will self destruct.
data.listener = NULL;
data.listen_conn = NULL;
data.conn = NULL;
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- ftpState->data.listener = new Comm::ListenStateData(conn, false, ftpState->entry->url());
+ ftpState->data.listener = new Comm::ConnAcceptor(conn, false, ftpState->entry->url());
ftpState->data.listener->subscribe(acceptCall);
- if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
+ if (ftpState->data.listener->errcode != 0) {
conn->close();
} else {
if (!fallback)
conn->local.SetPort(comm_local_port(conn->fd));
ftpState->data.host = NULL;
+ AsyncJob::AsyncStart(ftpState->data.listener);
}
ftpState->data.listen_conn = conn;
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
- data.listener = new Comm::ListenStateData(data.listen_conn, false, data.host);
+ data.listener = new Comm::ConnAcceptor(data.listen_conn, false, data.host);
data.listener->subscribe(acceptCall);
+ AsyncJob::AsyncStart(data.listener);
return;
}
}
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
- data.listener = new Comm::ListenStateData(data.conn, false, data.host);
+ data.listener = new Comm::ConnAcceptor(data.conn, false, data.host);
data.listener->subscribe(acceptCall);
+ AsyncJob::AsyncStart(data.listener);
} else {
debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
ftpFail(this);
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+ ftpState->data.listener = new Comm::ConnAcceptor(ftpState->data.conn, false, ftpState->data.host);
ftpState->data.listener->subscribe(acceptCall);
+ AsyncJob::AsyncStart(ftpState->data.listener);
return;
} else if (!ftpState->flags.tried_nlst && code > 300) {
ftpSendNlst(ftpState);
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+ ftpState->data.listener = new Comm::ConnAcceptor(ftpState->data.conn, false, ftpState->data.host);
ftpState->data.listener->subscribe(acceptCall);
+ AsyncJob::AsyncStart(ftpState->data.listener);
} else if (code >= 300) {
if (!ftpState->flags.try_slash_hack) {
/* Try this as a directory missing trailing slash... */
{
// channels with active listeners will be closed when the listener handler dies.
if (listener) {
- listener->unsubscribe();
- delete listener;
+ listener->unsubscribe(); /// listener job will self-destruct.
listener = NULL;
comm_remove_close_handler(conn->fd, closer);
closer = NULL;