{
public:
typedef CommAcceptCbParams Params;
+ typedef RefCount<CommAcceptCbPtrFun> Pointer;
CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
void dial();
class CommCbFunPtrCallT: public AsyncCall
{
public:
+ typedef RefCount<CommCbFunPtrCallT<Dialer> > Pointer;
typedef typename Dialer::Params Params;
inline CommCbFunPtrCallT(int debugSection, int debugLevel,
const char *callName, const Dialer &aDialer);
+ inline CommCbFunPtrCallT(const CommCbFunPtrCallT &o) :
+ AsyncCall(o.debugSection, o.debugLevel, o.name),
+ dialer(o.dialer)
+ {}
+
+ ~CommCbFunPtrCallT() {}
+
virtual CallDialer* getDialer() { return &dialer; }
public:
protected:
inline virtual bool canFire();
inline virtual void fire();
+
+private:
+ CommCbFunPtrCallT & operator=(const CommCbFunPtrCallT &); // not defined. not permitted.
};
// Conveninece wrapper: It is often easier to call a templated function than
*/
#include "squid.h"
+#include "comm.h"
#include "ProtoPort.h"
#if HAVE_LIMITS
#include <limits>
#endif
-http_port_list::http_port_list(const char *aProtocol)
+http_port_list::http_port_list(const char *aProtocol) :
+ listenFd(-1)
#if USE_SSL
- :
- http(*this), dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
+ , http(*this)
+ , dynamicCertMemCacheSize(std::numeric_limits<size_t>::max())
#endif
{
protocol = xstrdup(aProtocol);
http_port_list::~http_port_list()
{
- delete listener;
+ if (listenFd >= 0) {
+ comm_close(listenFd);
+ listenFd = -1;
+ }
safe_free(name);
safe_free(defaultsite);
#ifndef SQUID_PROTO_PORT_H
#define SQUID_PROTO_PORT_H
-//#include "typedefs.h"
#include "cbdata.h"
-#include "comm/ListenStateData.h"
#if USE_SSL
#include "ssl/gadgets.h"
} tcp_keepalive;
/**
- * The FD listening socket handler.
- * If not NULL we are actively listening for client requests.
- * delete to close the socket.
+ * The FD listening socket.
+ * If >= 0 we are actively listening for client requests.
+ * use comm_close(listenFd) to stop.
*/
- Comm::ListenStateData *listener;
+ int listenFd;
#if USE_SSL
// XXX: temporary hack to ease move of SSL options to http_port
private:
const char *isCanceled; // set to the cancelation reason by cancel()
+
+ // not implemented to prevent nil calls from being passed around and unknowingly scheduled, for now.
+ AsyncCall();
+ AsyncCall(const AsyncCall &);
};
inline
const Dialer &aDialer): AsyncCall(aDebugSection, aDebugLevel, aName),
dialer(aDialer) {}
+ AsyncCallT(const AsyncCallT<Dialer> &o):
+ AsyncCall(o.debugSection, o.debugLevel, o.name),
+ dialer(o.dialer) {}
+
+ ~AsyncCallT() {}
+
CallDialer *getDialer() { return &dialer; }
protected:
virtual void fire() { dialer.dial(*this); }
Dialer dialer;
+
+private:
+ AsyncCallT & operator=(const AsyncCallT &); // not defined. call assignments not permitted.
};
template <class Dialer>
public:
/// Must be passed an object. nil pointers are not permitted.
explicit CallSubscription(const RefCount<Call_> &aCall) : call(aCall) { assert(aCall != NULL); }
- virtual AsyncCall::Pointer callback() const { return new Call_(call); }
+ virtual AsyncCall::Pointer callback() const { return new Call_(*call); }
private:
const RefCount<Call_> call; ///< gets copied to create callback calls
#include "ClientRequestContext.h"
#include "clientStream.h"
#include "comm.h"
-#include "comm/Write.h"
-#include "comm/ListenStateData.h"
+#include "CommCalls.h"
#include "comm/Loops.h"
+#include "comm/Write.h"
+#include "comm/TcpAcceptor.h"
#include "ConnectionDetail.h"
#include "eui/Config.h"
#include "fde.h"
#include "ident/Config.h"
#include "ident/Ident.h"
#include "ip/Intercept.h"
+#include "ipc/FdNotes.h"
#include "ipc/StartListening.h"
#include "MemBuf.h"
#include "MemObject.h"
#define comm_close comm_lingering_close
#endif
-/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
+/// dials clientListenerConnectionOpened call
class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
{
public:
- typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
- ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
- handler(aHandler), portCfg(aPortCfg) {}
+ typedef void (*Handler)(int fd, int flags, int errNo, http_port_list *portCfg, const Ipc::FdNoteId note, const Subscription::Pointer &sub);
+ ListeningStartedDialer(Handler aHandler, int openFlags, http_port_list *aPortCfg, const Ipc::FdNoteId note, const Subscription::Pointer &aSub):
+ handler(aHandler), portCfg(aPortCfg), portTypeNote(note), commOpenListenerFlags(openFlags), sub(aSub) {}
virtual void print(std::ostream &os) const {
startPrint(os) <<
}
virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
+ virtual void dial(AsyncCall &) { (handler)(fd, commOpenListenerFlags, errNo, portCfg, portTypeNote, sub); }
public:
Handler handler;
private:
- http_port_list *portCfg; ///< from Config.Sockaddr.http
+ http_port_list *portCfg; ///< from Config.Sockaddr.http
+ Ipc::FdNoteId portTypeNote; ///< Type of IPC socket being opened
+ int commOpenListenerFlags; ///< flags used by comm_open_listener
+ Subscription::Pointer sub; ///< The handler to be subscribed for this connetion listener
};
-
-static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
-#if USE_SSL
-static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
-#endif
+static void clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub);
/* our socket-related context */
/** Handle a new connection on HTTP socket. */
void
-httpAccept(int sock, int newfd, ConnectionDetail *details,
- comm_err_t flag, int xerrno, void *data)
+httpAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
{
http_port_list *s = (http_port_list *)data;
ConnStateData *connState = NULL;
if (flag != COMM_OK) {
- debugs(33, 1, "httpAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+ // Its possible the call was still queued when the client disconnected
+ debugs(33, 2, "httpAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
return;
}
/** handle a new HTTPS connection */
static void
-httpsAccept(int sock, int newfd, ConnectionDetail *details,
- comm_err_t flag, int xerrno, void *data)
+httpsAccept(int, int newfd, ConnectionDetail *details, comm_err_t flag, int xerrno, void *data)
{
https_port_list *s = (https_port_list *)data;
SSL_CTX *sslContext = s->staticSslContext.get();
if (flag != COMM_OK) {
- errno = xerrno;
- debugs(33, 1, "httpsAccept: FD " << sock << ": accept failure: " << xstrerr(xerrno));
+ // Its possible the call was still queued when the client disconnected
+ debugs(33, 2, "httpsAccept: FD " << s->listenFd << ": accept failure: " << xstrerr(xerrno));
return;
}
/// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed
static bool
-OpenedHttpSocket(int fd, const char *msgIfFail)
+OpenedHttpSocket(int fd, const Ipc::FdNoteId portType)
{
if (fd < 0) {
Must(NHttpSockets > 0); // we tried to open some
Must(HttpSockets[NHttpSockets] < 0); // no extra fds received
if (!NHttpSockets) // we could not open any listen sockets at all
- fatal(msgIfFail);
+ fatalf("Unable to open %s",FdNote(portType));
return false;
}
const int openFlags = COMM_NONBLOCKING |
(s->spoof_client_ip ? COMM_TRANSPARENT : 0);
- AsyncCall::Pointer callback = asyncCall(33,2,
- "clientHttpConnectionOpened",
- ListeningStartedDialer(&clientHttpConnectionOpened, s));
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
- Ipc::fdnHttpSocket, callback);
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+ AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, openFlags, s, Ipc::fdnHttpSocket, sub));
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpSocket, listenCall);
- HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+ HttpSockets[NHttpSockets++] = -1; // set in clientListenerConnectionOpened
}
#if USE_SSL
/// process clientHttpConnectionsOpen result
static void
-clientHttpConnectionOpened(int fd, int, http_port_list *s)
+clientListenerConnectionOpened(int fd, int flags, int errNo, http_port_list *s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub)
{
- if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
+ s->listenFd = fd;
+ if (!OpenedHttpSocket(s->listenFd, portTypeNote))
return;
Must(s);
+ Must(s->listenFd >= 0);
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpAccept)",
- CommAcceptCbPtrFun(httpAccept, s));
+ // TCP: setup a job to handle accept() with subscribed handler
+ AsyncJob::Start(new Comm::TcpAcceptor(s->listenFd, s->s, flags, FdNote(portTypeNote), sub));
- s->listener = new Comm::ListenStateData(fd, call, true);
-
- debugs(1, 1, "Accepting " <<
+ debugs(1, 1, "Accepting" <<
(s->intercepted ? " intercepted" : "") <<
(s->spoof_client_ip ? " spoofing" : "") <<
(s->sslBump ? " bumpy" : "") <<
(s->accel ? " accelerated" : "")
- << " HTTP connections at " << s->s
- << ", FD " << fd << "." );
+ << FdNote(portTypeNote) << " connections at "
+ << " FD " << s->listenFd << " on " << s->s);
- Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+ Must(AddOpenedHttpSocket(s->listenFd)); // otherwise, we have received a fd we did not ask for
}
#if USE_SSL
continue;
}
- AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
- ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));
-
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
- Ipc::fdnHttpsSocket, call);
-
- HttpSockets[NHttpSockets++] = -1;
- }
-}
-
-/// process clientHttpsConnectionsOpen result
-static void
-clientHttpsConnectionOpened(int fd, int, http_port_list *s)
-{
- if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
- return;
-
- Must(s);
+ const int openFlags = COMM_NONBLOCKING |
+ (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler(httpsAccept)",
- CommAcceptCbPtrFun(httpsAccept, s));
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
- s->listener = new Comm::ListenStateData(fd, call, true);
+ AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, openFlags,
+ &s->http, Ipc::fdnHttpsSocket, sub));
- debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags, Ipc::fdnHttpsSocket, listenCall);
- Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+ HttpSockets[NHttpSockets++] = -1;
+ }
}
-
#endif
void
clientHttpConnectionsClose(void)
{
for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
- if (s->listener) {
- debugs(1, 1, "FD " << s->listener->fd << " Closing HTTP connection");
- delete s->listener;
- s->listener = NULL;
+ if (s->listenFd >= 0) {
+ debugs(1, 1, "FD " << s->listenFd << " Closing HTTP connection");
+ comm_close(s->listenFd);
+ s->listenFd = -1;
}
}
#if USE_SSL
for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
- if (s->listener) {
- debugs(1, 1, "FD " << s->listener->fd << " Closing HTTPS connection");
- delete s->listener;
- s->listener = NULL;
+ if (s->listenFd >= 0) {
+ debugs(1, 1, "FD " << s->listenFd << " Closing HTTPS connection");
+ comm_close(s->listenFd);
+ s->listenFd = -1;
}
}
#endif
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
#include "comm/IoCallback.h"
-#include "comm/Write.h"
-#include "comm/ListenStateData.h"
#include "comm/Loops.h"
+#include "comm/Write.h"
+#include "comm/TcpAcceptor.h"
#include "CommIO.h"
#include "CommRead.h"
#include "ConnectionDetail.h"
bool
isOpen(const int fd)
{
- return fd_table[fd].flags.open != 0;
+ return fd >= 0 && fd_table[fd].flags.open != 0;
}
/**
#include "config.h"
#include "comm/AcceptLimiter.h"
-#include "comm/ListenStateData.h"
+#include "comm/TcpAcceptor.h"
#include "fde.h"
Comm::AcceptLimiter Comm::AcceptLimiter::Instance_;
}
void
-Comm::AcceptLimiter::defer(Comm::ListenStateData *afd)
+Comm::AcceptLimiter::defer(Comm::TcpAcceptor *afd)
{
afd->isLimited++;
debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
deferred.push_back(afd);
}
+void
+Comm::AcceptLimiter::removeDead(const Comm::TcpAcceptor *afd)
+{
+ for (unsigned int i = 0; i < deferred.size() && afd->isLimited > 0; i++) {
+ if (deferred[i] == afd) {
+ deferred[i]->isLimited--;
+ deferred[i] = NULL; // fast. kick() will skip empty entries later.
+ debugs(5, 5, HERE << "FD " << afd->fd << " x" << afd->isLimited);
+ }
+ }
+}
+
void
Comm::AcceptLimiter::kick()
{
+ // TODO: this could be optimized further with an iterator to search
+ // looking for first non-NULL, followed by dumping the first N
+ // with only one shift()/pop_front operation
+
debugs(5, 5, HERE << " size=" << deferred.size());
- if (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
- debugs(5, 5, HERE << " doing one.");
+ while (deferred.size() > 0 && fdNFree() >= RESERVED_FD) {
/* NP: shift() is equivalent to pop_front(). Giving us a FIFO queue. */
- ListenStateData *temp = deferred.shift();
- temp->isLimited--;
- temp->acceptNext();
+ TcpAcceptor *temp = deferred.shift();
+ if (temp != NULL) {
+ debugs(5, 5, HERE << " doing one.");
+ temp->isLimited--;
+ temp->acceptNext();
+ break;
+ }
}
}
namespace Comm
{
-class ListenStateData;
+class TcpAcceptor;
/**
* FIFO Queue holding listener socket handlers which have been activated
static AcceptLimiter &Instance();
/** delay accepting a new client connection. */
- void defer(Comm::ListenStateData *afd);
+ void defer(Comm::TcpAcceptor *afd);
+
+ /** remove all records of an acceptor. Only to be called by the ConnAcceptor::swanSong() */
+ void removeDead(const Comm::TcpAcceptor *afd);
/** try to accept and begin processing any delayed client connections. */
void kick();
static AcceptLimiter Instance_;
/** FIFO queue */
- Vector<Comm::ListenStateData*> deferred;
+ Vector<Comm::TcpAcceptor*> deferred;
};
}; // namepace Comm
+++ /dev/null
-#ifndef SQUID_LISTENERSTATEDATA_H
-#define SQUID_LISTENERSTATEDATA_H
-
-#include "base/AsyncCall.h"
-#include "comm.h"
-#if HAVE_MAP
-#include <map>
-#endif
-
-class ConnectionDetail;
-
-namespace Comm
-{
-
-class ListenStateData
-{
-
-public:
- ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many);
- ListenStateData(const ListenStateData &r); // not implemented.
- ~ListenStateData();
-
- void subscribe(AsyncCall::Pointer &call);
- void acceptNext();
- void notify(int newfd, comm_err_t flag, const ConnectionDetail &details);
-
- int fd;
-
- /// errno code of the last accept() or listen() action if one occurred.
- int errcode;
-
- /// whether this socket is delayed and on the AcceptLimiter queue.
- int32_t isLimited;
-
-private:
- /// Method to test if there are enough file escriptors to open a new client connection
- /// if not the accept() will be postponed
- static bool okToAccept();
-
- /// Method callback for whenever an FD is ready to accept a client connection.
- static void doAccept(int fd, void *data);
-
- void acceptOne();
- int oldAccept(ConnectionDetail &details);
-
- AsyncCall::Pointer theCallback;
- bool mayAcceptMore;
-
- void setListen();
-};
-
-} // namespace Comm
-
-#endif /* SQUID_LISTENERSTATEDATA_H */
libcomm_la_SOURCES= \
AcceptLimiter.cc \
AcceptLimiter.h \
- ListenStateData.cc \
- ListenStateData.h \
Loops.h \
ModDevPoll.cc \
ModEpoll.cc \
ModPoll.cc \
ModSelect.cc \
ModSelectWin32.cc \
+ TcpAcceptor.cc \
+ TcpAcceptor.h \
\
IoCallback.cc \
IoCallback.h \
*/
#include "squid.h"
+#include "base/TextException.h"
#include "CommCalls.h"
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
-#include "comm/ListenStateData.h"
#include "comm/Loops.h"
+#include "comm/TcpAcceptor.h"
#include "ConnectionDetail.h"
#include "fde.h"
#include "protos.h"
#include "SquidTime.h"
+namespace Comm {
+ CBDATA_CLASS_INIT(TcpAcceptor);
+};
+
+Comm::TcpAcceptor::TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+ const char *note, const Subscription::Pointer &aSub) :
+ AsyncJob("Comm::TcpAcceptor"),
+ errcode(0),
+ fd(listenFd),
+ isLimited(0),
+ theCallSub(aSub),
+ local_addr(laddr)
+{}
+
+void
+Comm::TcpAcceptor::subscribe(const Subscription::Pointer &aSub)
+{
+ debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << aSub);
+ unsubscribe("subscription change");
+ theCallSub = aSub;
+}
+
+void
+Comm::TcpAcceptor::unsubscribe(const char *reason)
+{
+ debugs(5, 5, HERE << status() << " AsyncCall Subscription " << theCallSub << " removed: " << reason);
+ theCallSub = NULL;
+}
+
+void
+Comm::TcpAcceptor::start()
+{
+ debugs(5, 5, HERE << status() << " AsyncCall Subscription: " << theCallSub);
+
+ Must(isOpen(fd));
+
+ setListen();
+
+ // if no error so far start accepting connections.
+ if (errcode == 0)
+ SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+}
+
+bool
+Comm::TcpAcceptor::doneAll() const
+{
+ // stop when FD is closed
+ if (!isOpen(fd)) {
+ return AsyncJob::doneAll();
+ }
+
+ // stop when handlers are gone
+ if (theCallSub == NULL) {
+ return AsyncJob::doneAll();
+ }
+
+ // open FD with handlers...keep accepting.
+ return false;
+}
+
+void
+Comm::TcpAcceptor::swanSong()
+{
+ debugs(5,5, HERE);
+ unsubscribe("swanSong");
+ fd = -1;
+ AcceptLimiter::Instance().removeDead(this);
+ AsyncJob::swanSong();
+}
+
+const char *
+Comm::TcpAcceptor::status() const
+{
+ static char ipbuf[MAX_IPSTRLEN] = {'\0'};
+ if (ipbuf[0] == '\0')
+ local_addr.ToHostname(ipbuf, MAX_IPSTRLEN);
+
+ static MemBuf buf;
+ buf.reset();
+ buf.Printf(" FD %d, %s",fd, ipbuf);
+
+ const char *jobStatus = AsyncJob::status();
+ buf.append(jobStatus, strlen(jobStatus));
+
+ return buf.content();
+}
+
/**
* New-style listen and accept routines
*
* accept()ed some time later.
*/
void
-Comm::ListenStateData::setListen()
+Comm::TcpAcceptor::setListen()
{
errcode = 0; // reset local errno copy.
if (listen(fd, Squid_MaxFD >> 2) < 0) {
- debugs(50, 0, HERE << "listen(FD " << fd << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
+ debugs(50, DBG_CRITICAL, "ERROR: listen(" << status() << ", " << (Squid_MaxFD >> 2) << "): " << xstrerror());
errcode = errno;
return;
}
debugs(5, DBG_IMPORTANT, "Installing accept filter '" << Config.accept_filter << "' on FD " << fd);
xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
if (setsockopt(fd, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa)) < 0)
- debugs(5, DBG_CRITICAL, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
+ debugs(5, DBG_CRITICAL, "WARNING: SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
#elif defined(TCP_DEFER_ACCEPT)
int seconds = 30;
if (strncmp(Config.accept_filter, "data=", 5) == 0)
seconds = atoi(Config.accept_filter + 5);
if (setsockopt(fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds)) < 0)
- debugs(5, DBG_CRITICAL, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
+ debugs(5, DBG_CRITICAL, "WARNING: TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
#else
- debugs(5, DBG_CRITICAL, "accept_filter not supported on your OS");
+ debugs(5, DBG_CRITICAL, "WARNING: accept_filter not supported on your OS");
#endif
}
}
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
- fd(aFd),
- theCallback(call),
- mayAcceptMore(accept_many)
-{
- assert(aFd >= 0);
- debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
- assert(isOpen(aFd));
- setListen();
- SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-Comm::ListenStateData::~ListenStateData()
-{
- comm_close(fd);
- fd = -1;
-}
-
/**
* This private callback is called whenever a filedescriptor is ready
* to dupe itself and fob off an accept()ed connection
* done later when enough sockets become available.
*/
void
-Comm::ListenStateData::doAccept(int fd, void *data)
+Comm::TcpAcceptor::doAccept(int fd, void *data)
{
- debugs(5, 2, HERE << "New connection on FD " << fd);
+ try {
+ debugs(5, 2, HERE << "New connection on FD " << fd);
- assert(isOpen(fd));
- ListenStateData *afd = static_cast<ListenStateData*>(data);
+ Must(isOpen(fd));
+ TcpAcceptor *afd = static_cast<TcpAcceptor*>(data);
+
+ if (!okToAccept()) {
+ AcceptLimiter::Instance().defer(afd);
+ } else {
+ afd->acceptNext();
+ }
+ SetSelect(fd, COMM_SELECT_READ, Comm::TcpAcceptor::doAccept, afd, 0);
- if (!okToAccept()) {
- AcceptLimiter::Instance().defer(afd);
- } else {
- afd->acceptNext();
+ } catch(const std::exception &e) {
+ fatalf("FATAL: error while accepting new client connection: %s\n", e.what());
+ } catch(...) {
+ fatal("FATAL: error while accepting new client connection: [unkown]\n");
}
- SetSelect(fd, COMM_SELECT_READ, Comm::ListenStateData::doAccept, afd, 0);
}
bool
-Comm::ListenStateData::okToAccept()
+Comm::TcpAcceptor::okToAccept()
{
static time_t last_warn = 0;
}
void
-Comm::ListenStateData::acceptOne()
+Comm::TcpAcceptor::acceptOne()
{
/*
* We don't worry about running low on FDs here. Instead,
*/
/* Accept a new connection */
- ConnectionDetail connDetails;
- int newfd = oldAccept(connDetails);
+ ConnectionDetail newConnDetails;
+ int newFd = -1;
+ const comm_err_t flag = oldAccept(newConnDetails, &newFd);
/* Check for errors */
- if (newfd < 0) {
+ if (!isOpen(newFd)) {
- if (newfd == COMM_NOMESSAGE) {
+ if (flag == COMM_NOMESSAGE) {
/* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
+ debugs(5, 5, HERE << "try later: FD " << fd << " handler Subscription: " << theCallSub);
SetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
return;
}
// A non-recoverable error; notify the caller */
- debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
- notify(-1, COMM_ERROR, connDetails);
- mayAcceptMore = false;
+ debugs(5, 5, HERE << "non-recoverable error:" << status() << " handler Subscription: " << theCallSub);
+ notify(flag, newConnDetails, newFd);
+ mustStop("Listener socket closed");
return;
}
- debugs(5, 5, HERE << "accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails.peer <<
- " handler: " << theCallback);
- notify(newfd, COMM_OK, connDetails);
+ debugs(5, 5, HERE << "Listener: FD " << fd <<
+ " accepted new connection from " << newConnDetails.peer <<
+ " handler Subscription: " << theCallSub);
+ notify(flag, newConnDetails, newFd);
}
void
-Comm::ListenStateData::acceptNext()
+Comm::TcpAcceptor::acceptNext()
{
- assert(isOpen(fd));
+ Must(isOpen(fd));
debugs(5, 2, HERE << "connection on FD " << fd);
acceptOne();
}
+// XXX: obsolete comment?
+// NP: can't be a const function because syncWithComm() side effects hit theCallSub->callback().
void
-Comm::ListenStateData::notify(int newfd, comm_err_t flag, const ConnectionDetail &connDetails)
+Comm::TcpAcceptor::notify(const comm_err_t flag, const ConnectionDetail &connDetails, int newFd) const
{
// listener socket handlers just abandon the port with COMM_ERR_CLOSING
// it should only happen when this object is deleted...
return;
}
- if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
+ if (theCallSub != NULL) {
+ AsyncCall::Pointer call = theCallSub->callback();
+ CommAcceptCbParams ¶ms = GetCommParams<CommAcceptCbParams>(call);
params.fd = fd;
- params.nfd = newfd;
+ params.nfd = newFd;
params.details = connDetails;
params.flag = flag;
params.xerrno = errcode;
- ScheduleCallHere(theCallback);
- if (!mayAcceptMore)
- theCallback = NULL;
+ ScheduleCallHere(call);
}
}
/**
* accept() and process
- * Wait for an incoming connection on FD.
+ * Wait for an incoming connection on our listener socket.
+ *
+ * \retval COMM_OK success. details parameter filled.
+ * \retval COMM_NOMESSAGE attempted accept() but nothing useful came in.
+ * \retval COMM_ERROR an outright failure occured.
+ * Or if this client has too many connections already.
*/
-int
-Comm::ListenStateData::oldAccept(ConnectionDetail &details)
+comm_err_t
+Comm::TcpAcceptor::oldAccept(ConnectionDetail &details, int *newFd)
{
PROF_start(comm_accept);
statCounter.syscalls.sock.accepts++;
PROF_stop(comm_accept);
if (ignoreErrno(errno)) {
- debugs(50, 5, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 5, HERE << status() << ": " << xstrerror());
return COMM_NOMESSAGE;
} else if (ENFILE == errno || EMFILE == errno) {
- debugs(50, 3, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 3, HERE << status() << ": " << xstrerror());
return COMM_ERROR;
} else {
- debugs(50, 1, HERE << "FD " << fd << ": " << xstrerror());
+ debugs(50, 1, HERE << status() << ": " << xstrerror());
return COMM_ERROR;
}
}
+ Must(sock >= 0);
+ *newFd = sock;
details.peer = *gai;
if ( Config.client_ip_max_connections >= 0) {
}
}
+ // lookup the local-end details of this new connection
details.me.InitAddrInfo(gai);
-
details.me.SetEmpty();
getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
details.me = *gai;
-
- commSetCloseOnExec(sock);
+ details.me.FreeAddrInfo(gai);
/* fdstat update */
+ // XXX : these are not all HTTP requests. use a note about type and ip:port details->
+ // so we end up with a uniform "(HTTP|FTP-data|HTTPS|...) remote-ip:remote-port"
fd_open(sock, FD_SOCKET, "HTTP Request");
fdd_table[sock].close_file = NULL;
fde *F = &fd_table[sock];
details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
F->remote_port = details.peer.GetPort();
- F->local_addr.SetPort(details.me.GetPort());
+ F->local_addr = details.me;
F->sock_family = details.me.IsIPv6()?AF_INET6:AF_INET;
- details.me.FreeAddrInfo(gai);
+ // set socket flags
+ commSetCloseOnExec(sock);
commSetNonBlocking(sock);
/* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
F->flags.transparent = fd_table[fd].flags.transparent;
PROF_stop(comm_accept);
- return sock;
+ return COMM_OK;
}
--- /dev/null
+#ifndef SQUID_COMM_TCPACCEPTOR_H
+#define SQUID_COMM_TCPACCEPTOR_H
+
+#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "CommCalls.h"
+#include "comm_err_t.h"
+#include "comm/TcpAcceptor.h"
+#include "ip/Address.h"
+
+#if HAVE_MAP
+#include <map>
+#endif
+
+namespace Comm
+{
+
+class AcceptLimiter;
+
+/**
+ * Listens on an FD for new incoming connections and
+ * emits an active FD descriptor for the new client.
+ *
+ * Handles all event limiting required to quash inbound connection
+ * floods within the global FD limits of available Squid_MaxFD and
+ * client_ip_max_connections.
+ *
+ * Fills the emitted connection with all connection details able to
+ * be looked up. Currently these are the local/remote IP:port details
+ * and the listening socket transparent-mode flag.
+ */
+class TcpAcceptor : public AsyncJob
+{
+private:
+ virtual void start();
+ virtual bool doneAll() const;
+ virtual void swanSong();
+ virtual const char *status() const;
+
+ TcpAcceptor(const TcpAcceptor &); // not implemented.
+
+public:
+ TcpAcceptor(const int listenFd, const Ip::Address &laddr, int flags,
+ const char *note, const Subscription::Pointer &aSub);
+
+ /** Subscribe a handler to receive calls back about new connections.
+ * Unsubscribes any existing subscribed handler.
+ */
+ void subscribe(const Subscription::Pointer &aSub);
+
+ /** Remove the currently waiting callback subscription.
+ * Already scheduled callbacks remain scheduled.
+ */
+ void unsubscribe(const char *reason);
+
+ /** Try and accept another connection (synchronous).
+ * If one is pending already the subscribed callback handler will be scheduled
+ * to handle it before this method returns.
+ */
+ void acceptNext();
+
+ /// Call the subscribed callback handler with details about a new connection.
+ void notify(const comm_err_t flags, const ConnectionDetail &newConnDetails, const int newFd) const;
+
+ /// errno code of the last accept() or listen() action if one occurred.
+ int errcode;
+
+ /// conn being listened on for new connections
+ /// Reserved for read-only use.
+ // NP: public only until we can hide it behind connection handles
+ int fd;
+
+protected:
+ friend class AcceptLimiter;
+ int32_t isLimited; ///< whether this socket is delayed and on the AcceptLimiter queue.
+
+private:
+ Subscription::Pointer theCallSub; ///< used to generate AsyncCalls handling our events.
+
+ /// IP Address and port being listened on
+ Ip::Address local_addr;
+
+ /// Method to test if there are enough file descriptors to open a new client connection
+ /// if not the accept() will be postponed
+ static bool okToAccept();
+
+ /// Method callback for whenever an FD is ready to accept a client connection.
+ static void doAccept(int fd, void *data);
+
+ void acceptOne();
+ comm_err_t oldAccept(ConnectionDetail &newConnDetails, int *fd);
+ void setListen();
+
+ CBDATA_CLASS2(TcpAcceptor);
+};
+
+} // namespace Comm
+
+#endif /* SQUID_COMM_TCPACCEPTOR_H */
#include "squid.h"
#include "comm.h"
+#include "CommCalls.h"
+#include "comm/TcpAcceptor.h"
#include "comm/Write.h"
-#include "comm/ListenStateData.h"
#include "compat/strtoll.h"
#include "ConnectionDetail.h"
#include "errorpage.h"
void clear(); /// just resets fd and close handler. does not close active connections.
- int fd; /// channel descriptor; \todo: remove because the closer has it
+ int fd; /// channel descriptor
- /** Current listening socket handler. delete on shutdown or abort.
- * FTP stores a copy of the FD in the field fd above.
- * Use close() to properly close the channel.
- */
- Comm::ListenStateData *listener;
+ Ip::Address local; ///< The local IP address:port this channel is using
+
+ int flags; ///< socket flags used when opening.
private:
AsyncCall::Pointer closer; /// Comm close handler callback
void completedListing(void);
void dataComplete();
void dataRead(const CommIoCbParams &io);
+
+ /// ignore timeout on CTRL channel. set read timeout on DATA channel.
+ void switchTimeoutToDataChannel();
+ /// create a data channel acceptor and start listening.
+ void listenForDataChannel(const int fd, const char *note);
+
int checkAuth(const HttpHeader * req_hdr);
void checkUrlpath();
void buildTitleUrl();
void
FtpStateData::ctrlClosed(const CommCloseCbParams &io)
{
+ debugs(9, 4, HERE);
ctrl.clear();
deleteThis("FtpStateData::ctrlClosed");
}
void
FtpStateData::dataClosed(const CommCloseCbParams &io)
{
- if (data.listener) {
- delete data.listener;
- data.listener = NULL;
- data.fd = -1;
+ debugs(9, 4, HERE);
+ if (data.fd >= 0) {
+ comm_close(data.fd);
+ // NP clear() does the: data.fd = -1;
}
data.clear();
failed(ERR_FTP_FAILURE, 0);
debugs(9, 9, HERE << ": OUT: login='" << login << "', escaped=" << escaped << ", user=" << user << ", password=" << password);
}
+void
+FtpStateData::switchTimeoutToDataChannel()
+{
+ commSetTimeout(ctrl.fd, -1, NULL, NULL);
+
+ typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
+ AsyncCall::Pointer timeoutCall = JobCallback(9, 5, TimeoutDialer, this, FtpStateData::ftpTimeout);
+ commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
+}
+
+void
+FtpStateData::listenForDataChannel(const int fd, const char *note)
+{
+ assert(data.fd < 0);
+
+ typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> AcceptDialer;
+ typedef AsyncCallT<AcceptDialer> AcceptCall;
+ RefCount<AcceptCall> call = static_cast<AcceptCall*>(JobCallback(11, 5, AcceptDialer, this, FtpStateData::ftpAcceptDataConnection));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
+
+ /* open the conn if its not already open */
+ int newFd = fd;
+ if (newFd < 0) {
+ newFd = comm_open_listener(SOCK_STREAM, IPPROTO_TCP, data.local, data.flags, note);
+ if (newFd < 0) {
+ debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" << data.local << " error: " << errno);
+ return;
+ }
+ debugs(9, 3, HERE << "Unconnected data socket created on FD " << newFd << ", " << data.local);
+ }
+
+ assert(newFd >= 0);
+ Comm::TcpAcceptor *tmp = new Comm::TcpAcceptor(newFd, data.local, data.flags, note, sub);
+ AsyncJob::Start(tmp);
+
+ // Ensure we have a copy of the FD opened for listening and a close handler on it.
+ data.opened(newFd, dataCloser());
+ switchTimeoutToDataChannel();
+}
+
void
FtpStateData::ftpTimeout(const CommTimeoutCbParams &io)
{
usable = end - sbuf;
- debugs(9, 3, HERE << "usable = " << usable);
+ debugs(9, 3, HERE << "usable = " << usable << " of " << len << " bytes.");
if (usable == 0) {
- debugs(9, 3, HERE << "didn't find end for " << entry->url() );
+ if (buf[0] == '\0' && len == 1) {
+ debugs(9, 3, HERE << "NIL ends data from " << entry->url() << " transfer problem?");
+ data.readBuf->consume(len);
+ } else {
+ debugs(9, 3, HERE << "didn't find end for " << entry->url());
+ debugs(9, 3, HERE << "buffer remains (" << len << " bytes) '" << rfc1738_do_escape(buf,0) << "'");
+ }
xfree(sbuf);
return;
}
* status code after the data command. FtpStateData was being
* deleted in the middle of dataRead().
*/
- scheduleReadControlReply(0);
+ /* AYJ: 2011-01-13: Bug 2581.
+ * 226 status is possibly waiting in the ctrl buffer.
+ * The connection will hang if we DONT send buffered_ok.
+ * This happens on all transfers which can be completly sent by the
+ * server before the 150 started status message is read in by Squid.
+ * ie all transfers of about one packet hang.
+ */
+ scheduleReadControlReply(1);
}
void
* establish one on the control socket.
*/
- if (data.fd > -1) {
+ if (data.fd >= 0) {
AsyncCall::Pointer nullCall = NULL;
commSetTimeout(data.fd, -1, nullCall);
}
static int
ftpOpenListenSocket(FtpStateData * ftpState, int fallback)
{
- int fd;
- Ip::Address addr;
struct addrinfo *AI = NULL;
- int on = 1;
int x = 0;
/// Close old data channels, if any. We may open a new one below.
- ftpState->data.close();
+ if ((ftpState->data.flags & COMM_REUSEADDR))
+ // NP: in fact it points to the control channel. just clear it.
+ ftpState->data.clear();
+ else
+ ftpState->data.close();
/*
* Set up a listen socket on the same local address as the
* control connection.
*/
-
- addr.InitAddrInfo(AI);
-
+ ftpState->data.local.InitAddrInfo(AI);
x = getsockname(ftpState->ctrl.fd, AI->ai_addr, &AI->ai_addrlen);
-
- addr = *AI;
-
- addr.FreeAddrInfo(AI);
+ ftpState->data.local = *AI;
+ ftpState->data.local.FreeAddrInfo(AI);
if (x) {
debugs(9, DBG_CRITICAL, HERE << "getsockname(" << ftpState->ctrl.fd << ",..): " << xstrerror());
* used for both control and data.
*/
if (fallback) {
+ int on = 1;
setsockopt(ftpState->ctrl.fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on));
+ ftpState->ctrl.flags |= COMM_REUSEADDR;
+ ftpState->data.flags |= COMM_REUSEADDR;
} else {
/* if not running in fallback mode a new port needs to be retrieved */
- addr.SetPort(0);
- }
-
- fd = comm_open(SOCK_STREAM,
- IPPROTO_TCP,
- addr,
- COMM_NONBLOCKING | (fallback ? COMM_REUSEADDR : 0),
- ftpState->entry->url());
- debugs(9, 3, HERE << "Unconnected data socket created on FD " << fd );
-
- if (fd < 0) {
- debugs(9, DBG_CRITICAL, HERE << "comm_open failed");
- return -1;
- }
-
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
- ftpState->data.listener = new Comm::ListenStateData(fd, acceptCall, false);
-
- if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
- comm_close(fd);
- return -1;
+ ftpState->data.local.SetPort(0);
+ ftpState->data.flags = COMM_NONBLOCKING;
}
- ftpState->data.opened(fd, ftpState->dataCloser());
- ftpState->data.port = comm_local_port(fd);
- ftpState->data.host = NULL;
- return fd;
+ ftpState->listenForDataChannel((fallback?ftpState->ctrl.fd:-1), ftpState->entry->url());
+ return ftpState->data.fd;
}
/// \ingroup ServerProtocolFTPInternal
debugs(9, 3, HERE);
ftpState->flags.pasv_supported = 0;
fd = ftpOpenListenSocket(ftpState, 0);
+ debugs(9, 3, "Listening for FTP data connection with FD " << fd);
Ip::Address::InitAddrInfo(AI);
*/
void FtpStateData::ftpAcceptDataConnection(const CommAcceptCbParams &io)
{
- char ntoapeer[MAX_IPSTRLEN];
- debugs(9, 3, "ftpAcceptDataConnection");
-
- // one connection accepted. the handler has stopped listening. drop our local pointer to it.
- data.listener = NULL;
+ debugs(9, 3, HERE);
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
abortTransaction("entry aborted when accepting data conn");
return;
}
+ if (io.flag != COMM_OK) {
+ data.close();
+ debugs(9, DBG_IMPORTANT, "FTP AcceptDataConnection: FD " << io.fd << ": " << xstrerr(io.xerrno));
+ /** \todo Need to send error message on control channel*/
+ ftpFail(this);
+ return;
+ }
+
+ /* data listening conn is no longer even open. abort. */
+ if (data.fd <= 0 || fd_table[data.fd].flags.open == 0) {
+ data.clear(); // ensure that it's cleared and not just closed.
+ return;
+ }
+
/** \par
* When squid.conf ftp_sanitycheck is enabled, check the new connection is actually being
* made by the remote client which is connected to the FTP control socket.
+ * Or the one which we were told to listen for by control channel messages (may differ under NAT).
* This prevents third-party hacks, but also third-party load balancing handshakes.
*/
if (Config.Ftp.sanitycheck) {
+ char ntoapeer[MAX_IPSTRLEN];
io.details.peer.NtoA(ntoapeer,MAX_IPSTRLEN);
- if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0) {
+ if (strcmp(fd_table[ctrl.fd].ipaddr, ntoapeer) != 0 &&
+ strcmp(fd_table[data.fd].ipaddr, ntoapeer) != 0) {
debugs(9, DBG_IMPORTANT,
"FTP data connection from unexpected server (" <<
io.details.peer << "), expecting " <<
- fd_table[ctrl.fd].ipaddr);
+ fd_table[ctrl.fd].ipaddr << " or " << fd_table[data.fd].ipaddr);
- /* close the bad soures connection down ASAP. */
+ /* close the bad sources connection down ASAP. */
comm_close(io.nfd);
- /* we are ony accepting once, so need to re-open the listener socket. */
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
- data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+ /* drop the bad connection (io) by ignoring the attempt. */
return;
}
}
- if (io.flag != COMM_OK) {
- debugs(9, DBG_IMPORTANT, "ftpHandleDataAccept: FD " << io.nfd << ": " << xstrerr(io.xerrno));
- /** \todo XXX Need to set error message */
- ftpFail(this);
- return;
- }
-
/**\par
- * Replace the Listen socket with the accepted data socket */
+ * Replace the Listening socket with the accepted data socket */
data.close();
data.opened(io.nfd, dataCloser());
data.port = io.details.peer.GetPort();
- io.details.peer.NtoA(data.host,SQUIDHOSTNAMELEN);
+ data.host = xstrdup(fd_table[io.nfd].ipaddr);
debugs(9, 3, "ftpAcceptDataConnection: Connected data socket on " <<
"FD " << io.nfd << " to " << io.details.peer << " FD table says: " <<
"ctrl-peer= " << fd_table[ctrl.fd].ipaddr << ", " <<
"data-peer= " << fd_table[data.fd].ipaddr);
+ assert(haveControlChannel("ftpAcceptDataConnection"));
+ assert(ctrl.message == NULL);
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, this, FtpStateData::ftpTimeout);
- commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
- /*\todo XXX We should have a flag to track connect state...
- * host NULL -> not connected, port == local port
- * host set -> connected, port == remote port
- */
- /* Restart state (SENT_NLST/LIST/RETR) */
- FTP_SM_FUNCS[state] (this);
+ // Ctrl channel operations will determine what happens to this data connection
}
/// \ingroup ServerProtocolFTPInternal
return;
}
- /*\par
- * When client status is 125, or 150 without a hostname, Begin data transfer. */
+ /* When client status is 125, or 150 without a hostname, Begin data transfer. */
debugs(9, 3, HERE << "starting data transfer");
+ switchTimeoutToDataChannel();
sendMoreRequestBody();
- /** \par
- * Cancel the timeout on the Control socket and
- * establish one on the data socket.
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, this, FtpStateData::ftpTimeout);
-
- commSetTimeout(data.fd, Config.Timeout.read, timeoutCall);
-
state = WRITING_DATA;
debugs(9, 3, HERE << "writing data channel");
} else if (code == 150) {
/*\par
- * When client code is 150 with a hostname, Accept data channel. */
+ * When client code is 150 without a hostname, Accept data channel. */
debugs(9, 3, "ftpReadStor: accepting data channel");
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, this, FtpStateData::ftpAcceptDataConnection);
-
- data.listener = new Comm::ListenStateData(data.fd, acceptCall, false);
+ listenForDataChannel(data.fd, data.host);
} else {
debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
ftpFail(this);
if (code == 125 || (code == 150 && ftpState->data.host)) {
/* Begin data transfer */
- /* XXX what about Config.Timeout.read? */
+ debugs(9, 3, HERE << "begin data transfer from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+ ftpState->switchTimeoutToDataChannel();
ftpState->maybeReadVirginBody();
ftpState->state = READING_DATA;
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
return;
} else if (code == 150) {
/* Accept data channel */
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
-
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
- commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+ debugs(9, 3, HERE << "accept data channel from " << ftpState->data.host << " (" << ftpState->data.local << ")");
+ ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
return;
} else if (!ftpState->flags.tried_nlst && code > 300) {
ftpSendNlst(ftpState);
if (code == 125 || (code == 150 && ftpState->data.host)) {
/* Begin data transfer */
debugs(9, 3, HERE << "reading data channel");
- /* XXX what about Config.Timeout.read? */
+ ftpState->switchTimeoutToDataChannel();
ftpState->maybeReadVirginBody();
ftpState->state = READING_DATA;
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
} else if (code == 150) {
/* Accept data channel */
- typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
- AsyncCall::Pointer acceptCall = JobCallback(11, 5,
- acceptDialer, ftpState, FtpStateData::ftpAcceptDataConnection);
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.fd, acceptCall, false);
- /*
- * Cancel the timeout on the Control socket and establish one
- * on the data socket
- */
- AsyncCall::Pointer nullCall = NULL;
- commSetTimeout(ftpState->ctrl.fd, -1, nullCall);
-
- typedef CommCbMemFunT<FtpStateData, CommTimeoutCbParams> TimeoutDialer;
- AsyncCall::Pointer timeoutCall = JobCallback(9, 5,
- TimeoutDialer, ftpState,FtpStateData::ftpTimeout);
- commSetTimeout(ftpState->data.fd, Config.Timeout.read, timeoutCall);
+ ftpState->listenForDataChannel(ftpState->data.fd, ftpState->data.host);
} else if (code >= 300) {
if (!ftpState->flags.try_slash_hack) {
/* Try this as a directory missing trailing slash... */
fd = aFd;
closer = aCloser;
comm_add_close_handler(fd, closer);
+
+ // grab the local IP address:port details for this connection
+ struct addrinfo *AI = NULL;
+ local.InitAddrInfo(AI);
+ getsockname(aFd, AI->ai_addr, &AI->ai_addrlen);
+ local = *AI;
+ local.FreeAddrInfo(AI);
}
/// planned close: removes the close handler and calls comm_close
FtpChannel::close()
{
// channels with active listeners will be closed when the listener handler dies.
- if (listener) {
- delete listener;
- listener = NULL;
- comm_remove_close_handler(fd, closer);
- closer = NULL;
- fd = -1;
- } else if (fd >= 0) {
- comm_remove_close_handler(fd, closer);
- closer = NULL;
+ if (fd >= 0) {
+ if (closer != NULL) {
+ comm_remove_close_handler(fd, closer);
+ closer = NULL;
+ }
comm_close(fd); // we do not expect to be called back
fd = -1;
}