CommAcceptCbPtrFun(IOACB *aHandler, const CommAcceptCbParams &aParams);
void dial();
- // yuck. But we can't store the PtrFun dialers without it.
-// virtual bool canDial(AsyncCall &c) { return true; }
-
virtual void print(std::ostream &os) const;
public:
#include "protos.h"
#include "SquidTime.h"
-// TODO remove.
-Comm::ListenStateData::ListenStateData(int aFd, AsyncCall::Pointer &call, bool accept_many) :
- fd(aFd),
- errcode(0),
- isLimited(0),
- theCallback(call),
- mayAcceptMore(accept_many)
-{
- assert(aFd >= 0);
- debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
- assert(isOpen(aFd));
- setListen();
- commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
-// TODO remove.
-Comm::ListenStateData::ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note) :
- errcode(0),
- isLimited(0),
- theCallback(call),
- mayAcceptMore(accept_many)
-{
- /* open the conn if its not already open */
- if (!IsConnOpen(conn)) {
- conn->fd = comm_open(SOCK_STREAM,
- IPPROTO_TCP,
- conn->local,
- conn->flags,
- note);
- debugs(9, 3, HERE << "Unconnected data socket created on FD " << conn->fd );
-
- if (!conn->isOpen()) {
- debugs(5, DBG_CRITICAL, HERE << "comm_open failed");
- errcode = -1;
- return;
- }
- }
-
- assert(IsConnOpen(conn));
- fd = conn->fd;
- debugs(5, 5, HERE << "FD " << conn->fd << " AsyncCall: " << call);
- setListen();
- if (errcode == 0)
- commSetSelect(conn->fd, COMM_SELECT_READ, doAccept, this, 0);
-}
-
Comm::ListenStateData::ListenStateData(int aFd, bool accept_many) :
fd(aFd),
errcode(0),
callLevel(NULL),
callName(NULL),
callDialer(NULL),
+ theCallback(NULL),
mayAcceptMore(accept_many)
{
assert(aFd >= 0);
callLevel(NULL),
callName(NULL),
callDialer(NULL),
+ theCallback(NULL),
mayAcceptMore(accept_many)
{
/* open the conn if its not already open */
Comm::ListenStateData::~ListenStateData()
{
+ unsubscribe();
comm_close(fd);
fd = -1;
- delete callDialer;;
}
void
debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << name);
// if this is the first subscription. start listening on the socket.
- if (callDialer == NULL)
+ if (callDialer == NULL && theCallback == NULL)
setListen();
+ // remove old subscription. if any.
+ unsubscribe();
+
// store the subscribed handler details.
callSection = section;
callLevel = level;
commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
}
+void
+Comm::ListenStateData::subscribe(const AsyncCall::Pointer &call)
+{
+ debugs(5, 5, HERE << "FD " << fd << " AsyncCall: " << call);
+
+ // remove old subscription. if any.
+ unsubscribe();
+
+ // store new callback subscription
+ theCallback = call;
+
+ // start listening on the socket.
+ if (theCallback != NULL) {
+ setListen();
+ // if no error so far start accepting connections.
+ if (errcode == 0)
+ commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
+ }
+}
+
void
Comm::ListenStateData::unsubscribe()
{
safe_free(callName);
delete callDialer;
callDialer = NULL;
+ theCallback = NULL;
}
/**
params.flag = flag;
params.xerrno = errcode;
ScheduleCallHere(theCallback);
- if (!mayAcceptMore)
- theCallback = NULL;
+ // only permit the call to be scheduled once.
+ mayAcceptMore = false;
+ theCallback = NULL;
}
}
{
public:
-// old remove ASAP when subscribe is working.
- ListenStateData(int fd, AsyncCall::Pointer &call, bool accept_many); // Legacy
- ListenStateData(Comm::ConnectionPointer &conn, AsyncCall::Pointer &call, bool accept_many, const char *note);
-
ListenStateData(int fd, bool accept_many); // Legacy verion that uses new subscribe API.
ListenStateData(Comm::ConnectionPointer &conn, bool accept_many, const char *note);
ListenStateData(const ListenStateData &r); // not implemented.
~ListenStateData();
- // legacy. removing ASAP when below version works.
- void subscribe(AsyncCall::Pointer &call) { theCallback = call; };
-
/** Subscribe a handler to receive calls back about new connections.
* Replaces any existing subscribed handler.
*/
void subscribe(int level, int section, const char *name, CommAcceptCbPtrFun *dialer);
-// void subscribe(int level, int section, const char *name, CommAcceptMemFun *dialer);
+
+ /** Subscribe a handler to receive calls back about new connections.
+ * Replaces any existing subscribed handler.
+ * Due to not being able to re-use calls, only permits one to be received.
+ */
+ void subscribe(const AsyncCall::Pointer &call);
/** Remove the currently waiting callback subscription.
* Pending calls will remain scheduled.
*/
void unsubscribe();
- /** Try and accept another connection.
- * If any are pending it will be passed asynchronously to the subscribed callback.
+ /** Try and accept another connection (synchronous).
+ * If one is pending already the subscribed callback handler will be scheduled
+ * to handle it before this method returns.
*/
void acceptNext();
FtpStateData::dataClosed(const CommCloseCbParams &io)
{
if (data.listener) {
+ data.listener->unsubscribe();
delete data.listener;
data.listener = NULL;
+ data.listen_conn = NULL;
data.conn = NULL;
}
data.clear();
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- ftpState->data.listener = new Comm::ListenStateData(conn, acceptCall, false, ftpState->entry->url());
+ ftpState->data.listener = new Comm::ListenStateData(conn, false, ftpState->entry->url());
+ ftpState->data.listener->subscribe(acceptCall);
if (!ftpState->data.listener || ftpState->data.listener->errcode != 0) {
conn->close();
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
+ data.listener = new Comm::ListenStateData(data.listen_conn, false, data.host);
data.listener->subscribe(acceptCall);
- data.listener->acceptNext();
return;
}
}
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(this, &FtpStateData::ftpAcceptDataConnection));
- data.listener = new Comm::ListenStateData(data.conn, acceptCall, false, data.host);
+ data.listener = new Comm::ListenStateData(data.conn, false, data.host);
+ data.listener->subscribe(acceptCall);
} else {
debugs(9, DBG_IMPORTANT, HERE << "Unexpected reply code "<< std::setfill('0') << std::setw(3) << code);
ftpFail(this);
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, acceptCall, false, ftpState->data.host);
+ ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+ ftpState->data.listener->subscribe(acceptCall);
return;
} else if (!ftpState->flags.tried_nlst && code > 300) {
ftpSendNlst(ftpState);
typedef CommCbMemFunT<FtpStateData, CommAcceptCbParams> acceptDialer;
AsyncCall::Pointer acceptCall = asyncCall(11, 5, "FtpStateData::ftpAcceptDataConnection",
acceptDialer(ftpState, &FtpStateData::ftpAcceptDataConnection));
- ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, acceptCall, false, ftpState->data.host);
+ ftpState->data.listener = new Comm::ListenStateData(ftpState->data.conn, false, ftpState->data.host);
+ ftpState->data.listener->subscribe(acceptCall);
} else if (code >= 300) {
if (!ftpState->flags.try_slash_hack) {
/* Try this as a directory missing trailing slash... */
{
// channels with active listeners will be closed when the listener handler dies.
if (listener) {
+ listener->unsubscribe();
delete listener;
listener = NULL;
comm_remove_close_handler(conn->fd, closer);