*/
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
reader = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommRead);
- comm_read(connection->fd, commBuf, readBuf.spaceSize(), reader);
+ comm_read(connection, commBuf, readBuf.spaceSize(), reader);
updateTimeout();
}
typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest);
- comm_read(clientConn->fd, in.addressToReadInto(), getAvailableBufferLength(), reader);
+ comm_read(clientConn, in.addressToReadInto(), getAvailableBufferLength(), reader);
}
* completes, on error, or on file descriptor close.
*/
void
-comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
+comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, IOCB *handler, void *handler_data)
{
AsyncCall::Pointer call = commCbCall(5,4, "SomeCommReadHandler",
CommIoCbPtrFun(handler, handler_data));
- comm_read(fd, buf, size, call);
+ comm_read(conn, buf, size, call);
}
void
-comm_read(int fd, char *buf, int size, AsyncCall::Pointer &callback)
+comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
{
- debugs(5, 5, "comm_read, queueing read for FD " << fd << "; asynCall " << callback);
+ debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
/* Make sure we are open and not closing */
- assert(isOpen(fd));
- assert(!fd_table[fd].closing());
- comm_io_callback_t *ccb = COMMIO_FD_READCB(fd);
+ assert(Comm::IsConnOpen(conn));
+ assert(!fd_table[conn->fd].closing());
+ comm_io_callback_t *ccb = COMMIO_FD_READCB(conn->fd);
// Make sure we are either not reading or just passively monitoring.
// Active/passive conflicts are OK and simply cancel passive monitoring.
if (ccb->active()) {
// if the assertion below fails, we have an active comm_read conflict
- assert(fd_table[fd].halfClosedReader != NULL);
- commStopHalfClosedMonitor(fd);
+ assert(fd_table[conn->fd].halfClosedReader != NULL);
+ commStopHalfClosedMonitor(conn->fd);
assert(!ccb->active());
}
/* Queue the read */
- commio_set_callback(fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size);
- commSetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
+ commio_set_callback(conn->fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size);
+ commSetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
}
/**
typedef DescriptorSet::const_iterator DSCI;
const DSCI end = TheHalfClosed->end();
for (DSCI i = TheHalfClosed->begin(); i != end; ++i) {
- const int fd = *i;
- if (!fd_table[fd].halfClosedReader) { // not reading already
+ Comm::ConnectionPointer c = new Comm::Connection; // XXX: temporary. make HalfClosed a list of these.
+ c->fd = *i;
+ if (!fd_table[c->fd].halfClosedReader) { // not reading already
AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader",
CommIoCbPtrFun(&commHalfClosedReader, NULL));
- comm_read(fd, NULL, 0, call);
- fd_table[fd].halfClosedReader = call;
- }
+ comm_read(c, NULL, 0, call);
+ fd_table[c->fd].halfClosedReader = call;
+ } else
+ c->fd = -1; // XXX: temporary. prevent c replacement erase closing listed FD
}
WillCheckHalfClosed = false; // as far as we know
* Set or clear the timeout for some action on an active connection.
* API to replace commSetTimeout() when a Comm::ConnectionPointer is available.
*/
-extern int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &calback);
+extern int commSetConnTimeout(const Comm::ConnectionPointer &conn, int seconds, AsyncCall::Pointer &callback);
SQUIDCEXTERN int ignoreErrno(int);
SQUIDCEXTERN void commCloseAllSockets(void);
extern int comm_has_pending_read_callback(int fd);
extern bool comm_monitors_read(int fd);
-extern void comm_read(int fd, char *buf, int len, IOCB *handler, void *data);
-extern void comm_read(int fd, char *buf, int len, AsyncCall::Pointer &callback);
+extern void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, IOCB *handler, void *data);
+extern void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback);
extern void comm_read_cancel(int fd, IOCB *callback, void *data);
extern void comm_read_cancel(int fd, AsyncCall::Pointer &callback);
extern int comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from);
vc->conn = conn;
comm_add_close_handler(conn->fd, idnsVCClosed, vc);
- comm_read(conn->fd, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc);
+ comm_read(conn, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc);
vc->busy = 0;
idnsDoSendQueryVC(vc);
}
vc->msg->size += len; // XXX should not access -> size directly
if (vc->msg->contentSize() < vc->msglen) {
- comm_read(conn->fd, buf + len, vc->msglen - vc->msg->contentSize(), idnsReadVC, vc);
+ comm_read(conn, buf + len, vc->msglen - vc->msg->contentSize(), idnsReadVC, vc);
return;
}
idnsGrokReply(vc->msg->buf, vc->msg->contentSize());
vc->msg->clean();
- comm_read(conn->fd, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc);
+ comm_read(conn, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc);
}
static void
assert(vc->read_msglen <= 2);
if (vc->read_msglen < 2) {
- comm_read(conn->fd, buf + len, 2 - vc->read_msglen, idnsReadVCHeader, vc);
+ comm_read(conn, buf + len, 2 - vc->read_msglen, idnsReadVCHeader, vc);
return;
}
vc->msglen = ntohs(vc->msglen);
vc->msg->init(vc->msglen, vc->msglen);
- comm_read(conn->fd, vc->msg->buf, vc->msglen, idnsReadVC, vc);
+ comm_read(conn, vc->msg->buf, vc->msglen, idnsReadVC, vc);
}
/*
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
AsyncCall::Pointer reader = JobCallback(9, 5, Dialer, this, FtpStateData::ftpReadControlReply);
- comm_read(ctrl.conn->fd, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader);
+ comm_read(ctrl.conn, ctrl.buf + ctrl.offset, ctrl.size - ctrl.offset, reader);
}
}
}
if (do_next_read)
- comm_read(conn->fd, buf, read_sz, gopherReadReply, gopherState);
+ comm_read(conn, buf, read_sz, gopherReadReply, gopherState);
return;
}
comm_add_close_handler(rfd, helperServerFree, srv);
- comm_read(srv->readPipe->fd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
+ comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
}
hlp->last_restart = squid_curtime;
comm_add_close_handler(rfd, helperStatefulServerFree, srv);
- comm_read(srv->readPipe->fd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
+ comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
}
hlp->last_restart = squid_curtime;
}
if (Comm::IsConnOpen(srv->readPipe))
- comm_read(srv->readPipe->fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
+ comm_read(srv->readPipe, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
}
static void
}
if (Comm::IsConnOpen(srv->readPipe))
- comm_read(srv->readPipe->fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
+ comm_read(srv->readPipe, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
helperStatefulHandleRead, srv);
}
conn->remote.GetPort(),
conn->local.GetPort());
comm_write_mbuf(conn, &mb, NULL, state);
- comm_read(conn->fd, state->buf, BUFSIZ, Ident::ReadReply, state);
+ comm_read(conn, state->buf, BUFSIZ, Ident::ReadReply, state);
commSetTimeout(conn->fd, Ident::TheConfig.timeout, Ident::Timeout, state);
}
typedef CommCbMemFunT<Port, CommIoCbParams> Dialer;
AsyncCall::Pointer readHandler = JobCallback(54, 6,
Dialer, this, Port::noteRead);
- comm_read(conn()->fd, buf.raw(), buf.size(), readHandler);
+ comm_read(conn(), buf.raw(), buf.size(), readHandler);
}
bool Ipc::Port::doneAll() const
}
fds[nfds++] = fd;
- comm_read(fd, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this);
- commSetTimeout(fd, Config.Timeout.pconn, IdleConnList::timeout, this);
+ Comm::ConnectionPointer temp = new Comm::Connection; // XXX: transition. until pconn's converted to store Comm::Connection
+ temp->fd = fd; // assume control of the fd.
+ comm_read(temp, fakeReadBuf, sizeof(fakeReadBuf), IdleConnList::read, this);
+ commSetTimeout(temp->fd, Config.Timeout.pconn, IdleConnList::timeout, this);
}
/*
*/
#include "squid.h"
+#include "CacheManager.h"
+#include "comm/Connection.h"
#include "event.h"
+#if DELAY_POOLS
+#include "DelayPools.h"
+#endif
#include "fde.h"
-#include "Store.h"
-#include "CacheManager.h"
-#include "StoreClient.h"
-#include "stmem.h"
#include "HttpReply.h"
#include "HttpRequest.h"
-#include "MemObject.h"
#include "mem_node.h"
-#include "StoreMeta.h"
-#include "SwapDir.h"
-#if DELAY_POOLS
-#include "DelayPools.h"
-#endif
-#include "Stack.h"
+#include "MemObject.h"
#include "SquidTime.h"
+#include "Stack.h"
+#include "stmem.h"
+#include "Store.h"
+#include "StoreClient.h"
+#include "StoreMeta.h"
#include "swap_log_op.h"
+#include "SwapDir.h"
static STMCB storeWriteComplete;
}
- comm_read(fd, buf, amountToRead, callback);
+ Comm::ConnectionPointer temp = new Comm::Connection; // XXX: transition. until conn passed in.
+ temp->fd = fd;
+ comm_read(temp, buf, amountToRead, callback);
}
size_t
-StoreEntry::bytesWanted (Range<size_t> const aRange) const
+StoreEntry::bytesWanted(Range<size_t> const aRange) const
{
assert (aRange.size());
}
void
-comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
+comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, IOCB *handler, void *handler_data)
{
fatal ("Not implemented");
}
void
-comm_read(int, char*, int, AsyncCall::Pointer &callback)
+comm_read(const Comm::ConnectionPointer &conn, char*, int, AsyncCall::Pointer &callback)
{
fatal ("Not implemented");
}
TunnelStateData::copyRead(Connection &from, IOCB *completion)
{
assert(from.len == 0);
- comm_read(from.conn->fd, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this);
+ comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), completion, this);
}
static void
snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print));
comm_write(fwd->serverConnection(), buf, strlen(buf), whoisWriteComplete, p, NULL);
- comm_read(fwd->serverConnection()->fd, p->buf, BUFSIZ, whoisReadReply, p);
+ comm_read(fwd->serverConnection(), p->buf, BUFSIZ, whoisReadReply, p);
commSetTimeout(fwd->serverConnection()->fd, Config.Timeout.read, whoisTimeout, p);
}
}
if (do_next_read)
- comm_read(conn->fd, aBuffer, BUFSIZ, whoisReadReply, this);
+ comm_read(conn, aBuffer, BUFSIZ, whoisReadReply, this);
}
static void