* creates namespace Comm.
* The comm_write() functions are moved into that scope as Comm::Write()
and only accept AsyncCall now. Old wrapper functions are removed.
* commio_* functions are all moved to methods of a new Comm::IoCallback
object. Which represents either a read or a write callback event
waiting to happen. Old wrapper functions have been removed.
* The fdc_table of pending read and write callbacks has been moved into
the Comm scope with (the name iocb_table) and should be considered private.
For now the COMMIO_*_CB() macros are retained to produce a pointer to
a callback object in this table.
* libcomm-listener.la has been renamed to libcomm.la
libsquid_la_SOURCES = \
comm.cc \
comm.h \
+ comm_err_t.h \
CommCalls.cc \
CommCalls.h \
DescriptorSet.cc \
squid_LDADD = \
$(COMMON_LIBS) \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
eui/libeui.la \
icmp/libicmp.la icmp/libicmp-core.la \
log/liblog.la \
wordlist.cc
nodist_tests_testCacheManager_SOURCES = \
$(BUILT_SOURCES)
-# comm.cc only requires comm/libcomm-listener.la until fdc_table is dead.
+# comm.cc only requires comm/libcomm.la until fdc_table is dead.
tests_testCacheManager_LDADD = \
$(COMMON_LIBS) \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
icmp/libicmp.la icmp/libicmp-core.la \
log/liblog.la \
$(REPL_OBJS) \
tests_testEvent_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
log/liblog.la \
$(REPL_OBJS) \
${ADAPTATION_LIBS} \
tests_testEventLoop_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
log/liblog.la \
$(REPL_OBJS) \
${ADAPTATION_LIBS} \
tests_test_http_range_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
log/liblog.la \
$(REPL_OBJS) \
${ADAPTATION_LIBS} \
tests_testHttpRequest_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
log/liblog.la \
$(REPL_OBJS) \
${ADAPTATION_LIBS} \
tests_testURL_LDADD = \
$(COMMON_LIBS) \
icmp/libicmp.la icmp/libicmp-core.la \
- comm/libcomm-listener.la \
+ comm/libcomm.la \
log/liblog.la \
$(REGEXLIB) \
$(REPL_OBJS) \
* Rationale:
* ----------
*
- * Here is how one would comm_write an object without MemBuffer:
+ * Here is how one would Comm::Write an object without MemBuffer:
*
* {
* -- allocate:
* ...
*
* -- write
- * comm_write(buf, free, ...);
+ * Comm::Write(buf, free, ...);
* }
*
* The whole "packing" idea is quite messy: We are given a buffer of fixed
* ...
*
* -- write
- * comm_write_mbuf(fd, buf, handler, data);
+ * Comm::Write(fd, buf, callback);
*
* -- *iff* you did not give the buffer away, free it yourself
* -- buf.clean();
* Comm.c lacks commAppend[Printf] because comm does not handle its own
* buffers (no mem_obj equivalent for comm.c).
*
- * Thus, if one wants to be able to store _and_ comm_write an object, s/he
+ * Thus, if one wants to be able to store _and_ Comm::Write an object, s/he
* has to implement two almost identical functions.
*
* Packer
* Packer has its own append and printf routines that "know" where to send
* incoming data. In case of store interface, Packer sends data to
* storeAppend. Otherwise, Packer uses a MemBuf that can be flushed later to
- * comm_write.
+ * Comm::Write.
*
* Thus, one can write just one function that will either "pack" things for
- * comm_write or "append" things to store, depending on actual packer
+ * Comm::Write or "append" things to store, depending on actual packer
* supplied.
*
* It is amazing how much work a tiny object can save. :)
#include "squid.h"
#include "base/TextException.h"
+#include "comm/Write.h"
#include "Server.h"
#include "Store.h"
#include "fde.h" /* for fd_table[fd].closing */
typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
requestSender = JobCallback(93,3,
Dialer, this, ServerStateData::sentRequestBody);
- comm_write_mbuf(fd, &buf, requestSender);
+ Comm::Write(fd, &buf, requestSender);
} else {
debugs(9,3, HERE << "will wait for more request body bytes or eof");
requestSender = NULL;
protected:
BodyPipe::Pointer requestBodySource; /**< to consume request body */
- AsyncCall::Pointer requestSender; /**< set if we are expecting comm_write to call us back */
+ AsyncCall::Pointer requestSender; /**< set if we are expecting Comm::Write to call us back */
#if USE_ADAPTATION
BodyPipe::Pointer virginBodyDestination; /**< to provide virgin response body */
#include "squid.h"
#include "comm.h"
+#include "comm/Write.h"
#include "CommCalls.h"
#include "HttpMsg.h"
#include "adaptation/icap/Xaction.h"
writer = JobCallback(93,3,
Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
- comm_write_mbuf(connection, &buf, writer);
+ Comm::Write(connection, &buf, writer);
updateTimeout();
}
#include "ClientRequestContext.h"
#include "clientStream.h"
#include "comm.h"
+#include "comm/Write.h"
#include "comm/ListenStateData.h"
#include "base/TextException.h"
#include "ConnectionDetail.h"
AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg",
CommIoCbPtrFun(&WroteControlMsg, this));
- comm_write_mbuf(fd(), mb, call);
+ Comm::Write(fd(), mb, call);
delete mb;
}
noteSentBodyBytes (length);
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
CommIoCbPtrFun(clientWriteBodyComplete, this));
- comm_write(fd(), bodyData.data, length, call );
+ Comm::Write(fd(), bodyData.data, length, call, NULL);
return;
}
/* write */
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
CommIoCbPtrFun(clientWriteComplete, this));
- comm_write_mbuf(fd(), &mb, call);
+ Comm::Write(fd(), &mb, call);
} else
writeComplete(fd(), NULL, 0, COMM_OK);
}
debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
CommIoCbPtrFun(clientWriteComplete, this));
- comm_write_mbuf(fd(), mb, call);
+ Comm::Write(fd(), mb, call);
delete mb;
}
#include "client_side_reply.h"
#include "client_side_request.h"
#include "ClientRequestContext.h"
+#include "comm/Write.h"
#include "compat/inet_pton.h"
#include "fde.h"
#include "HttpReply.h"
// TODO: Unify with tunnel.cc and add a Server(?) header
static const char *const conn_established =
"HTTP/1.1 200 Connection established\r\n\r\n";
- comm_write(fd, conn_established, strlen(conn_established),
- &SslBumpEstablish, this, NULL);
+ AsyncCall::Pointer call = commCbCall(85, 5, "ClientSocketContext::sslBumpEstablish",
+ CommIoCbPtrFun(&SslBumpEstablish, this));
+ Comm::Write(fd, conn_established, strlen(conn_established), call, NULL);
}
#endif
#include "fde.h"
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
#include "comm/ListenStateData.h"
#include "CommIO.h"
#include "CommRead.h"
* New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
*/
-typedef enum {
- IOCB_NONE,
- IOCB_READ,
- IOCB_WRITE
-} iocb_type;
-
static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
static void comm_init_opened(int new_socket, Ip::Address &addr, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
static void commHandleWriteHelper(void * data);
#endif
-static void commSelectOrQueueWrite(const int fd);
-
-struct comm_io_callback_t {
- iocb_type type;
- int fd;
- AsyncCall::Pointer callback;
- char *buf;
- FREE *freefunc;
- int size;
- int offset;
- comm_err_t errcode;
- int xerrno;
-#if DELAY_POOLS
- unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
-#endif
-
-
- bool active() const { return callback != NULL; }
-};
-
-struct _comm_fd {
- int fd;
- comm_io_callback_t readcb;
- comm_io_callback_t writecb;
-};
-typedef struct _comm_fd comm_fd_t;
-comm_fd_t *commfd_table;
-
-// TODO: make this a comm_io_callback_t method?
-bool
-commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
-{
- assert(ccb->fd == fd);
- assert(ccb->type == type);
- return ccb->active();
-}
-
-/*
- * Configure comm_io_callback_t for I/O
- *
- * @param fd filedescriptor
- * @param ccb comm io callback
- * @param cb callback
- * @param cbdata callback data (must be cbdata'ed)
- * @param buf buffer, if applicable
- * @param freefunc freefunc, if applicable
- * @param size buffer size
- */
-static void
-commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
- AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
-{
- assert(!ccb->active());
- assert(ccb->type == type);
- assert(cb != NULL);
- ccb->fd = fd;
- ccb->callback = cb;
- ccb->buf = buf;
- ccb->freefunc = freefunc;
- ccb->size = size;
- ccb->offset = 0;
-}
-
-
-// Schedule the callback call and clear the callback
-static void
-commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
-{
- debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
- code << ", " << xerrno << ")");
- assert(ccb->active());
- assert(ccb->fd == fd);
- ccb->errcode = code;
- ccb->xerrno = xerrno;
-
-#if DELAY_POOLS
- ccb->quotaQueueReserv = 0;
-#endif
-
- comm_io_callback_t cb = *ccb;
-
- /* We've got a copy; blow away the real one */
- /* XXX duplicate code from commio_cancel_callback! */
- ccb->xerrno = 0;
- ccb->callback = NULL; // cb has it
-
- /* free data */
- if (cb.freefunc) {
- cb.freefunc(cb.buf);
- cb.buf = NULL;
- }
-
- if (cb.callback != NULL) {
- typedef CommIoCbParams Params;
- Params ¶ms = GetCommParams<Params>(cb.callback);
- params.fd = cb.fd;
- params.buf = cb.buf;
- params.size = cb.offset;
- params.flag = cb.errcode;
- params.xerrno = cb.xerrno;
- ScheduleCallHere(cb.callback);
- }
-}
-
-
-/*
- * Cancel the given callback
- *
- * Remember that the data is cbdataRef'ed.
- */
-// TODO: make this a comm_io_callback_t method
-static void
-commio_cancel_callback(int fd, comm_io_callback_t *ccb)
-{
- debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
- assert(ccb->fd == fd);
- assert(ccb->active());
-
- ccb->xerrno = 0;
- ccb->callback = NULL;
-
-#if DELAY_POOLS
- ccb->quotaQueueReserv = 0;
-#endif
-}
-
-/*
- * Call the given comm callback; assumes the callback is valid.
- *
- * @param ccb io completion callback
- */
-void
-commio_call_callback(comm_io_callback_t *ccb)
-{
-}
-
class ConnectStateData
{
#endif
static void commSetTcpRcvbuf(int, int);
static PF commConnectFree;
-static PF commHandleWrite;
static IPH commConnectDnsHandle;
typedef enum {
void
commHandleRead(int fd, void *data)
{
- comm_io_callback_t *ccb = (comm_io_callback_t *) data;
+ Comm::IoCallback *ccb = (Comm::IoCallback *) data;
assert(data == COMMIO_FD_READCB(fd));
- assert(commio_has_callback(fd, IOCB_READ, ccb));
+ assert(ccb->active());
/* Attempt a read */
statCounter.syscalls.sock.reads++;
errno = 0;
if (retval < 0 && !ignoreErrno(errno)) {
debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
ccb->offset = 0;
- commio_finish_callback(fd, ccb, COMM_ERROR, errno);
+ ccb->finish(COMM_ERROR, errno);
return;
};
if (retval >= 0) {
fd_bytes(fd, retval, FD_READ);
ccb->offset = retval;
- commio_finish_callback(fd, ccb, COMM_OK, errno);
+ ccb->finish(COMM_OK, errno);
return;
}
/* Make sure we are open and not closing */
assert(isOpen(fd));
assert(!fd_table[fd].closing());
- comm_io_callback_t *ccb = COMMIO_FD_READCB(fd);
+ Comm::IoCallback *ccb = COMMIO_FD_READCB(fd);
// Make sure we are either not reading or just passively monitoring.
// Active/passive conflicts are OK and simply cancel passive monitoring.
}
/* Queue the read */
- commio_set_callback(fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size);
+ ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
commSetSelect(fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
}
{
assert(isOpen(fd));
// Being active is usually the same as monitoring because we always
- // start monitoring the FD when we configure comm_io_callback_t for I/O
- // and we usually configure comm_io_callback_t for I/O when we starting
- // monitoring a FD for reading. TODO: replace with commio_has_callback
+ // start monitoring the FD when we configure Comm::IoCallback for I/O
+ // and we usually configure Comm::IoCallback for I/O when we starting
+ // monitoring a FD for reading.
return COMMIO_FD_READCB(fd)->active();
}
return;
}
- comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+ Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
// TODO: is "active" == "monitors FD"?
if (!cb->active()) {
debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
assert(params.data == data);
/* Delete the callback */
- commio_cancel_callback(fd, cb);
+ cb->cancel("old comm_read_cancel");
/* And the IO event */
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
return;
}
- comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+ Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
if (!cb->active()) {
debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
assert(call == callback);
/* Delete the callback */
- commio_cancel_callback(fd, cb);
+ cb->cancel("comm_read_cancel");
/* And the IO event */
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
commSetTimeout(fd, -1, NULL, NULL);
// notify read/write handlers after canceling select reservations, if any
- if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
+ if (COMMIO_FD_WRITECB(fd)->active()) {
commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
- commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
+ COMMIO_FD_WRITECB(fd)->finish(COMM_ERR_CLOSING, errno);
}
- if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
+ if (COMMIO_FD_READCB(fd)->active()) {
commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
- commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
+ COMMIO_FD_READCB(fd)->finish(COMM_ERR_CLOSING, errno);
}
#if DELAY_POOLS
/* make sure the accept() socket FIFO delay queue exists */
Comm::AcceptLimiter::Instance();
- commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
- for (int pos = 0; pos < Squid_MaxFD; pos++) {
- commfd_table[pos].fd = pos;
- commfd_table[pos].readcb.fd = pos;
- commfd_table[pos].readcb.type = IOCB_READ;
- commfd_table[pos].writecb.fd = pos;
- commfd_table[pos].writecb.type = IOCB_WRITE;
- }
+ // make sure the IO pending callback table exists
+ Comm::CallbackTableInit();
/* XXX account fd_table */
/* Keep a few file descriptors free so that we don't run out of FD's
safe_free(fd_table);
safe_free(fdd_table);
- safe_free(commfd_table);
+ Comm::CallbackTableDestruct();
}
#if DELAY_POOLS
// called when the queue is done waiting for the client bucket to fill
-static void
+void
commHandleWriteHelper(void * data)
{
CommQuotaQueue *queue = static_cast<CommQuotaQueue*>(data);
do {
// check that the head descriptor is still relevant
const int head = clientInfo->quotaPeekFd();
- comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
+ Comm::IoCallback *ccb = COMMIO_FD_WRITECB(head);
if (fd_table[head].clientInfo == clientInfo &&
clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
!fd_table[head].closing()) {
// wait for the head descriptor to become ready for writing
- commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+ commSetSelect(head, COMM_SELECT_WRITE, Comm::HandleWrite, ccb, 0);
clientInfo->selectWaiting = true;
return;
}
fds.pop_front();
++outs;
}
-
-#endif
-
-/* Write to FD. */
-static void
-commHandleWrite(int fd, void *data)
-{
- comm_io_callback_t *state = (comm_io_callback_t *)data;
- int len = 0;
- int nleft;
-
- assert(state == COMMIO_FD_WRITECB(fd));
-
- PROF_start(commHandleWrite);
- debugs(5, 5, "commHandleWrite: FD " << fd << ": off " <<
- (long int) state->offset << ", sz " << (long int) state->size << ".");
-
- nleft = state->size - state->offset;
-
-#if DELAY_POOLS
- ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
- if (clientInfo && !clientInfo->writeLimitingActive)
- clientInfo = NULL; // we only care about quota limits here
-
- if (clientInfo) {
- assert(clientInfo->selectWaiting);
- clientInfo->selectWaiting = false;
-
- assert(clientInfo->hasQueue());
- assert(clientInfo->quotaPeekFd() == fd);
- clientInfo->quotaDequeue(); // we will write or requeue below
-
- if (nleft > 0) {
- const int quota = clientInfo->quotaForDequed();
- if (!quota) { // if no write quota left, queue this fd
- state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
- clientInfo->kickQuotaQueue();
- PROF_stop(commHandleWrite);
- return;
- }
-
- const int nleft_corrected = min(nleft, quota);
- if (nleft != nleft_corrected) {
- debugs(5, 5, HERE << "FD " << fd << " writes only " <<
- nleft_corrected << " out of " << nleft);
- nleft = nleft_corrected;
- }
-
- }
- }
-
-#endif
-
- /* actually WRITE data */
- len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
- debugs(5, 5, "commHandleWrite: write() returns " << len);
-
-#if DELAY_POOLS
- if (clientInfo) {
- if (len > 0) {
- /* we wrote data - drain them from bucket */
- clientInfo->bucketSize -= len;
- if (clientInfo->bucketSize < 0.0) {
- debugs(5,1, HERE << "drained too much"); // should not happen
- clientInfo->bucketSize = 0;
- }
- }
-
- // even if we wrote nothing, we were served; give others a chance
- clientInfo->kickQuotaQueue();
- }
-#endif
-
- fd_bytes(fd, len, FD_WRITE);
- statCounter.syscalls.sock.writes++;
- // After each successful partial write,
- // reset fde::writeStart to the current time.
- fd_table[fd].writeStart = squid_curtime;
-
- if (len == 0) {
- /* Note we even call write if nleft == 0 */
- /* We're done */
-
- if (nleft != 0)
- debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
-
- commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
- } else if (len < 0) {
- /* An error */
-
- if (fd_table[fd].flags.socket_eof) {
- debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
- commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
- } else if (ignoreErrno(errno)) {
- debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
- commSelectOrQueueWrite(fd);
- } else {
- debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
- commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
- }
- } else {
- /* A successful write, continue */
- state->offset += len;
-
- if (state->offset < state->size) {
- /* Not done, reinstall the write handler and write some more */
- commSelectOrQueueWrite(fd);
- } else {
- commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
- }
- }
-
- PROF_stop(commHandleWrite);
-}
-
-/*
- * Queue a write. handler/handler_data are called when the write
- * completes, on error, or on file descriptor close.
- *
- * free_func is used to free the passed buffer when the write has completed.
- */
-void
-comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
-{
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
- CommIoCbPtrFun(handler, handler_data));
-
- comm_write(fd, buf, size, call, free_func);
-}
-
-void
-comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
-{
- debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback);
-
- /* Make sure we are open, not closing, and not writing */
- assert(isOpen(fd));
- assert(!fd_table[fd].closing());
- comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
- assert(!ccb->active());
-
- fd_table[fd].writeStart = squid_curtime;
- /* Queue the write */
- commio_set_callback(fd, IOCB_WRITE, ccb, callback,
- (char *)buf, free_func, size);
-
- commSelectOrQueueWrite(fd);
-}
-
-// called when fd needs to write but may need to wait in line for its quota
-static void
-commSelectOrQueueWrite(const int fd)
-{
- comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
-
-#if DELAY_POOLS
- // stand in line if there is one
- if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
- if (clientInfo->writeLimitingActive) {
- ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
- clientInfo->kickQuotaQueue();
- return;
- }
- }
#endif
- commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
-}
-
-
-/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
-void
-comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data)
-{
- comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
-}
-
-void
-comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
-{
- comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
-}
-
-
/*
* hm, this might be too general-purpose for all the places we'd
* like to use it.
static bool
writeTimedOut(int fd)
{
- if (!commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd)))
+ if (!COMMIO_FD_WRITECB(fd)->active())
return false;
if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
// We have an active write callback and we are timed out
debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
- commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERROR, ETIMEDOUT);
+ COMMIO_FD_WRITECB(fd)->finish(COMM_ERROR, ETIMEDOUT);
} else if (AlreadyTimedOut(F))
continue;
#include "squid.h"
#include "AsyncEngine.h"
#include "base/AsyncCall.h"
+#include "comm_err_t.h"
+#include "comm/IoCallback.h"
#include "StoreIOBuffer.h"
#include "Array.h"
#include "ip/Address.h"
-#define COMMIO_FD_READCB(fd) (&commfd_table[(fd)].readcb)
-#define COMMIO_FD_WRITECB(fd) (&commfd_table[(fd)].writecb)
-
-typedef enum {
- COMM_OK = 0,
- COMM_ERROR = -1,
- COMM_NOMESSAGE = -3,
- COMM_TIMEOUT = -4,
- COMM_SHUTDOWN = -5,
- COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */
- COMM_INPROGRESS = -7,
- COMM_ERR_CONNECT = -8,
- COMM_ERR_DNS = -9,
- COMM_ERR_CLOSING = -10,
- COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
- COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
-} comm_err_t;
-
class DnsLookupDetails;
typedef void CNCB(int fd, const DnsLookupDetails &dns, comm_err_t status, int xerrno, void *data);
SQUIDCEXTERN void commResetSelect(int);
SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen);
-extern void comm_write(int fd, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func);
-extern void comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL);
-SQUIDCEXTERN void comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data);
-extern void comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
SQUIDCEXTERN void commCallCloseHandlers(int fd);
SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *);
extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback);
--- /dev/null
+#include "config.h"
+#include "ClientInfo.h"
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
+#include "CommCalls.h"
+#include "fde.h"
+
+Comm::CbEntry *Comm::iocb_table;
+
+void
+Comm::CallbackTableInit()
+{
+ // XXX: convert this to a std::map<> ?
+ iocb_table = static_cast<CbEntry*>(xcalloc(Squid_MaxFD, sizeof(CbEntry)));
+ for (int pos = 0; pos < Squid_MaxFD; pos++) {
+ iocb_table[pos].fd = pos;
+ iocb_table[pos].readcb.fd = pos;
+ iocb_table[pos].readcb.type = IOCB_READ;
+ iocb_table[pos].writecb.fd = pos;
+ iocb_table[pos].writecb.type = IOCB_WRITE;
+ }
+}
+
+void
+Comm::CallbackTableDestruct()
+{
+ safe_free(iocb_table);
+}
+
+/**
+ * Configure Comm::Callback for I/O
+ *
+ * @param fd filedescriptor
+ * @param t IO callback type (read or write)
+ * @param cb callback
+ * @param buf buffer, if applicable
+ * @param func freefunc, if applicable
+ * @param sz buffer size
+ */
+void
+Comm::IoCallback::setCallback(Comm::iocb_type t, AsyncCall::Pointer &cb, char *b, FREE *f, int sz)
+{
+ assert(!active());
+ assert(type == t);
+ assert(cb != NULL);
+
+ callback = cb;
+ buf = b;
+ freefunc = f;
+ size = sz;
+ offset = 0;
+}
+
+void
+Comm::IoCallback::selectOrQueueWrite()
+{
+#if DELAY_POOLS
+ // stand in line if there is one
+ if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
+ if (clientInfo->writeLimitingActive) {
+ quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+ clientInfo->kickQuotaQueue();
+ return;
+ }
+ }
+#endif
+
+ commSetSelect(fd, COMM_SELECT_WRITE, Comm::HandleWrite, this, 0);
+}
+
+void
+Comm::IoCallback::cancel(const char *reason)
+{
+ if (!active())
+ return;
+
+ callback->cancel(reason);
+ callback = NULL;
+ reset();
+}
+
+void
+Comm::IoCallback::reset()
+{
+ if (freefunc) {
+ freefunc(buf);
+ buf = NULL;
+ freefunc = NULL;
+ }
+ xerrno = 0;
+
+#if DELAY_POOLS
+ quotaQueueReserv = 0;
+#endif
+}
+
+// Schedule the callback call and clear the callback
+void
+Comm::IoCallback::finish(comm_err_t code, int xerrn)
+{
+ debugs(5, 3, HERE << "called for FD " << fd << " (" << code << ", " << xerrno << ")");
+ assert(active());
+
+ /* free data */
+ if (freefunc) {
+ freefunc(buf);
+ buf = NULL;
+ freefunc = NULL;
+ }
+
+ if (callback != NULL) {
+ typedef CommIoCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
+ params.fd = fd;
+ params.buf = buf;
+ params.size = offset;
+ params.flag = code;
+ params.xerrno = xerrn;
+ ScheduleCallHere(callback);
+ callback = NULL;
+ }
+
+ /* Reset for next round. */
+ reset();
+}
--- /dev/null
+#ifndef _SQUID_COMM_IOCALLBACK_H
+#define _SQUID_COMM_IOCALLBACK_H
+
+#include "config.h"
+#include "base/AsyncCall.h"
+#include "comm_err_t.h"
+
+namespace Comm {
+
+/// Type of IO callbacks the Comm layer deals with.
+typedef enum {
+ IOCB_NONE,
+ IOCB_READ,
+ IOCB_WRITE
+} iocb_type;
+
+/// Details about a particular Comm IO callback event.
+class IoCallback {
+public:
+ iocb_type type;
+ int fd;
+ AsyncCall::Pointer callback;
+ char *buf;
+ FREE *freefunc;
+ int size;
+ int offset;
+ comm_err_t errcode;
+ int xerrno;
+#if DELAY_POOLS
+ unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
+#endif
+
+ bool active() const { return callback != NULL; }
+ void setCallback(iocb_type type, AsyncCall::Pointer &cb, char *buf, FREE *func, int sz);
+
+ /// called when fd needs to write but may need to wait in line for its quota
+ void selectOrQueueWrite();
+
+ /// Actively cancel the given callback
+ void cancel(const char *reason);
+
+ /// finish the IO operation imediately and schedule the callback with the current state.
+ void finish(comm_err_t code, int xerrn);
+
+private:
+ void reset();
+};
+
+/// Entry nodes for the IO callback table: iocb_table
+/// Keyed off the FD which the event applies to.
+class CbEntry {
+public:
+ int fd;
+ IoCallback readcb;
+ IoCallback writecb;
+};
+
+/// Table of scheduled IO events which have yet to be processed ??
+/// Callbacks which might be scheduled in future are stored in fd_table.
+extern CbEntry *iocb_table;
+
+extern void CallbackTableInit();
+extern void CallbackTableDestruct();
+
+#define COMMIO_FD_READCB(fd) (&Comm::iocb_table[(fd)].readcb)
+#define COMMIO_FD_WRITECB(fd) (&Comm::iocb_table[(fd)].writecb)
+
+}; // namespace Comm
+
+#endif /* _SQUID_COMM_IOCALLBACK_H */
include $(top_srcdir)/src/Common.am
include $(top_srcdir)/src/TestHeaders.am
-noinst_LTLIBRARIES = libcomm-listener.la
+noinst_LTLIBRARIES = libcomm.la
-## Library holding listener comm socket handlers
-libcomm_listener_la_SOURCES= \
+## Library holding comm socket handlers
+libcomm_la_SOURCES= \
AcceptLimiter.cc \
AcceptLimiter.h \
ListenStateData.cc \
ListenStateData.h \
\
+ IoCallback.cc \
+ IoCallback.h \
+ Write.cc \
+ Write.h \
+ \
comm_internal.h
--- /dev/null
+#include "config.h"
+#if DELAY_POOLS
+#include "ClientInfo.h"
+#endif
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
+#include "fde.h"
+#include "SquidTime.h"
+#include "MemBuf.h"
+
+void
+Comm::Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
+{
+ Comm::Write(fd, mb->buf, mb->size, callback, mb->freeFunc());
+}
+
+void
+Comm::Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
+{
+ debugs(5, 5, HERE << "FD " << fd << ": sz " << size << ": asynCall " << callback);
+
+ /* Make sure we are open, not closing, and not writing */
+ assert(fd_table[fd].flags.open);
+ assert(!fd_table[fd].closing());
+ Comm::IoCallback *ccb = COMMIO_FD_WRITECB(fd);
+ assert(!ccb->active());
+
+ fd_table[fd].writeStart = squid_curtime;
+ /* Queue the write */
+ ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
+ ccb->selectOrQueueWrite();
+}
+
+/** Write to FD.
+ * This function is used by the lowest level of IO loop which only has access to FD numbers.
+ * We have to use the comm iocb_table to map FD numbers to waiting data.
+ * Once the write has been concluded we schedule the waiting call with success/fail results.
+ */
+void
+Comm::HandleWrite(int fd, void *data)
+{
+ Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
+ int len = 0;
+ int nleft;
+
+ assert(state->fd == fd);
+
+ PROF_start(commHandleWrite);
+ debugs(5, 5, HERE << "FD " << state->fd << ": off " <<
+ (long int) state->offset << ", sz " << (long int) state->size << ".");
+
+ nleft = state->size - state->offset;
+
+#if DELAY_POOLS
+ ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+ if (clientInfo && !clientInfo->writeLimitingActive)
+ clientInfo = NULL; // we only care about quota limits here
+
+ if (clientInfo) {
+ assert(clientInfo->selectWaiting);
+ clientInfo->selectWaiting = false;
+
+ assert(clientInfo->hasQueue());
+ assert(clientInfo->quotaPeekFd() == fd);
+ clientInfo->quotaDequeue(); // we will write or requeue below
+
+ if (nleft > 0) {
+ const int quota = clientInfo->quotaForDequed();
+ if (!quota) { // if no write quota left, queue this fd
+ state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
+ clientInfo->kickQuotaQueue();
+ PROF_stop(commHandleWrite);
+ return;
+ }
+
+ const int nleft_corrected = min(nleft, quota);
+ if (nleft != nleft_corrected) {
+ debugs(5, 5, HERE << "FD " << fd << " writes only " <<
+ nleft_corrected << " out of " << nleft);
+ nleft = nleft_corrected;
+ }
+
+ }
+ }
+#endif
+
+ /* actually WRITE data */
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+ debugs(5, 5, HERE << "write() returns " << len);
+
+#if DELAY_POOLS
+ if (clientInfo) {
+ if (len > 0) {
+ /* we wrote data - drain them from bucket */
+ clientInfo->bucketSize -= len;
+ if (clientInfo->bucketSize < 0.0) {
+ debugs(5,1, HERE << "drained too much"); // should not happen
+ clientInfo->bucketSize = 0;
+ }
+ }
+
+ // even if we wrote nothing, we were served; give others a chance
+ clientInfo->kickQuotaQueue();
+ }
+#endif
+
+ fd_bytes(fd, len, FD_WRITE);
+ statCounter.syscalls.sock.writes++;
+ // After each successful partial write,
+ // reset fde::writeStart to the current time.
+ fd_table[fd].writeStart = squid_curtime;
+
+ if (len == 0) {
+ /* Note we even call write if nleft == 0 */
+ /* We're done */
+ if (nleft != 0)
+ debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
+
+ state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+ } else if (len < 0) {
+ /* An error */
+ if (fd_table[fd].flags.socket_eof) {
+ debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+ state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+ } else if (ignoreErrno(errno)) {
+ debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+ state->selectOrQueueWrite();
+ } else {
+ debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
+ state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
+ }
+ } else {
+ /* A successful write, continue */
+ state->offset += len;
+
+ if (state->offset < state->size) {
+ /* Not done, reinstall the write handler and write some more */
+ state->selectOrQueueWrite();
+ } else {
+ state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
+ }
+ }
+
+ PROF_stop(commHandleWrite);
+}
--- /dev/null
+#ifndef _SQUID_COMM_IOWRITE_H
+#define _SQUID_COMM_IOWRITE_H
+
+#include "base/AsyncCall.h"
+
+namespace Comm {
+
+/**
+ * Queue a write. callback is scheduled when the write
+ * completes, on error, or on file descriptor close.
+ *
+ * free_func is used to free the passed buffer when the write has completed.
+ */
+void Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
+
+/**
+ * Queue a write. callback is scheduled when the write
+ * completes, on error, or on file descriptor close.
+ */
+void Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
+
+/// Cancel the write pending on FD. No action if none pending.
+void WriteCancel(int fd, const char *reason);
+
+// callback handler to process an FD which is available for writing.
+extern PF HandleWrite;
+
+}; // namespace Comm
+
+#endif /* _SQUID_COMM_IOWRITE_H */
--- /dev/null
+#ifndef _SQUID_COMM_COMM_ERR_T_H
+#define _SQUID_COMM_COMM_ERR_T_H
+
+#include "config.h"
+
+typedef enum {
+ COMM_OK = 0,
+ COMM_ERROR = -1,
+ COMM_NOMESSAGE = -3,
+ COMM_TIMEOUT = -4,
+ COMM_SHUTDOWN = -5,
+ COMM_IDLE = -6, /* there are no active fds and no pending callbacks. */
+ COMM_INPROGRESS = -7,
+ COMM_ERR_CONNECT = -8,
+ COMM_ERR_DNS = -9,
+ COMM_ERR_CLOSING = -10,
+ COMM_ERR_PROTOCOL = -11, /* IPv4 or IPv6 cannot be used on the fd socket */
+ COMM_ERR__END__ = -999999 /* Dummy entry to make syntax valid (comma on line above), do not use. New entries added above */
+} comm_err_t;
+
+#endif /* _SQUID_COMM_COMM_ERR_T_H */
#include "SquidTime.h"
#include "Store.h"
#include "comm.h"
+#include "comm/Write.h"
#include "fde.h"
#include "ip/tools.h"
#include "MemBuf.h"
commSetTimeout(vc->fd, Config.Timeout.idns_query, NULL, NULL);
- comm_write_mbuf(vc->fd, mb, idnsSentQueryVC, vc);
+ AsyncCall::Pointer call = commCbCall(78, 5, "idnsSentQueryVC",
+ CommIoCbPtrFun(&idnsSentQueryVC, vc));
+
+ Comm::Write(vc->fd, mb, call);
delete mb;
}
*
*/
#include "config.h"
-
+#include "comm/Write.h"
#include "errorpage.h"
#include "auth/UserRequest.h"
#include "SquidTime.h"
rep = err->BuildHttpReply();
MemBuf *mb = rep->pack();
- comm_write_mbuf(fd, mb, errorSendComplete, err);
+ AsyncCall::Pointer call = commCbCall(78, 5, "errorSendComplete",
+ CommIoCbPtrFun(&errorSendComplete, err));
+ Comm::Write(fd, mb, call);
delete mb;
delete rep;
#include "squid.h"
#include "comm.h"
+#include "comm/Write.h"
#include "comm/ListenStateData.h"
#include "compat/strtoll.h"
#include "ConnectionDetail.h"
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(9, 5,
Dialer, this, FtpStateData::ftpWriteCommandCallback);
- comm_write(ctrl.fd,
- ctrl.last_command,
- strlen(ctrl.last_command),
- call);
+ Comm::Write(ctrl.fd, ctrl.last_command, strlen(ctrl.last_command), call, NULL);
scheduleReadControlReply(0);
}
*/
#include "squid.h"
+#include "comm/Write.h"
#include "errorpage.h"
#include "Store.h"
#include "html_quote.h"
}
debugs(10, 5, "gopherSendRequest: FD " << fd);
- comm_write(fd, buf, strlen(buf), gopherSendComplete, gopherState, NULL);
+ AsyncCall::Pointer call = commCbCall(5,5, "gopherSendComplete",
+ CommIoCbPtrFun(gopherSendComplete, gopherState));
+ Comm::Write(fd, buf, strlen(buf), call, NULL);
if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE))
gopherState->entry->setPublicKey(); /* Make it public */
*/
#include "squid.h"
+#include "comm/Write.h"
#include "helper.h"
#include "SquidMath.h"
#include "SquidTime.h"
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = 1;
- comm_write(srv->wfd,
- srv->writebuf->content(),
- srv->writebuf->contentSize(),
- helperDispatchWriteDone, /* Handler */
- srv, NULL); /* Handler-data, freefunc */
+ AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
+ CommIoCbPtrFun(helperDispatchWriteDone, srv));
+ Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
}
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = 1;
- comm_write(srv->wfd,
- srv->writebuf->content(),
- srv->writebuf->contentSize(),
- helperDispatchWriteDone, /* Handler */
- srv, NULL); /* Handler-data, free func */
+ AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
+ CommIoCbPtrFun(helperDispatchWriteDone, srv));
+ Comm::Write(srv->wfd, srv->writebuf->content(), srv->writebuf->contentSize(), call, NULL);
}
debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index + 1 << ", " << strlen(r->buf) << " bytes");
srv->flags.reserved = 1;
srv->request = r;
srv->dispatch_time = current_time;
- comm_write(srv->wfd,
- r->buf,
- strlen(r->buf),
- helperStatefulDispatchWriteDone, /* Handler */
- hlp, NULL); /* Handler-data, free func */
+ AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
+ CommIoCbPtrFun(helperStatefulDispatchWriteDone, hlp));
+ Comm::Write(srv->wfd, r->buf, strlen(r->buf), call, NULL);
debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
hlp->id_name << " #" << srv->index + 1 << ", " <<
(int) strlen(r->buf) << " bytes");
#include "base/AsyncJobCalls.h"
#include "base/TextException.h"
#include "base64.h"
+#include "comm/Write.h"
#if DELAY_POOLS
#include "DelayPools.h"
#endif
#include "errorpage.h"
-#include "fde.h"
#include "http.h"
#include "HttpControlMsg.h"
#include "HttpHdrContRange.h"
request->peer_host=_peer?_peer->host:NULL;
buildRequestPrefix(request, orig_request, entry, &mb);
debugs(11, 6, "httpSendRequest: FD " << fd << ":\n" << mb.buf);
- comm_write_mbuf(fd, &mb, requestSender);
+ Comm::Write(fd, &mb, requestSender);
return true;
}
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
requestSender = JobCallback(11,5,
Dialer, this, HttpStateData::wroteLast);
- comm_write(fd, "\r\n", 2, requestSender);
+ Comm::Write(fd, "\r\n", 2, requestSender, NULL);
return true;
#else
return false;
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
requestSender = JobCallback(11,5,
Dialer, this, HttpStateData::wroteLast);
- comm_write(fd, "0\r\n\r\n", 5, requestSender);
+ Comm::Write(fd, "0\r\n\r\n", 5, requestSender, NULL);
return true;
}
#if USE_IDENT
#include "comm.h"
+#include "comm/Write.h"
#include "ident/Config.h"
#include "ident/Ident.h"
#include "MemBuf.h"
mb.Printf("%d, %d\r\n",
state->my_peer.GetPort(),
state->me.GetPort());
- comm_write_mbuf(fd, &mb, NULL, state);
+
+ AsyncCall::Pointer nil;
+ Comm::Write(fd, &mb, nil);
comm_read(fd, state->buf, BUFSIZ, Ident::ReadReply, state);
commSetTimeout(fd, Ident::TheConfig.timeout, Ident::Timeout, state);
}
#include "config.h"
#include "comm.h"
#include "CommCalls.h"
+#include "comm/Write.h"
#include "base/TextException.h"
#include "ipc/UdsOp.h"
typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
AsyncCall::Pointer writeHandler = JobCallback(54, 5,
Dialer, this, UdsSender::wrote);
- comm_write(fd(), message.raw(), message.size(), writeHandler);
+ Comm::Write(fd(), message.raw(), message.size(), writeHandler, NULL);
writing = true;
}
#include "config.h"
#include "base/TextException.h"
+#include "comm/Write.h"
#include "CommCalls.h"
#include "HttpReply.h"
#include "ipc/Coordinator.h"
std::auto_ptr<MemBuf> replyBuf(reply->pack());
writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
- comm_write_mbuf(fd, replyBuf.get(), writer);
+ Comm::Write(fd, replyBuf.get(), writer);
}
/// called when we wrote the response header
#include "config.h"
#include "base/TextException.h"
#include "CommCalls.h"
+#include "comm/Write.h"
#include "ipc/FdNotes.h"
#include "mgr/StoreToCommWriter.h"
#include "StoreClient.h"
AsyncCall::Pointer writer =
asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
MyDialer(this, &StoreToCommWriter::noteCommWrote));
- comm_write(fd, ioBuf.data, ioBuf.length, writer);
+ Comm::Write(fd, ioBuf.data, ioBuf.length, writer, NULL);
}
void
#include "HttpRequest.h"
#include "fde.h"
#include "comm.h"
+#include "comm/Write.h"
#include "client_side_request.h"
#include "acl/FilledChecklist.h"
#if DELAY_POOLS
if (from.len == 0 && !fd_closed(to.fd()) ) {
comm_close(to.fd());
}
- } else if (cbdataReferenceValid(this))
- comm_write(to.fd(), from.buf, len, completion, this, NULL);
+ } else if (cbdataReferenceValid(this)) {
+ AsyncCall::Pointer call = commCbCall(5,5, "SomeTunnelWriteHandler",
+ CommIoCbPtrFun(completion, this));
+ Comm::Write(to.fd(), from.buf, len, call, NULL);
+ }
cbdataInternalUnlock(this); /* ??? */
}
TunnelStateData *tunnelState = (TunnelStateData *)data;
debugs(26, 3, "tunnelConnected: FD " << fd << " tunnelState=" << tunnelState);
*tunnelState->status_ptr = HTTP_OK;
- comm_write(tunnelState->client.fd(), conn_established, strlen(conn_established),
- tunnelConnectedWriteDone, tunnelState, NULL);
+ AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
+ CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
+ Comm::Write(tunnelState->client.fd(), conn_established, strlen(conn_established), call, NULL);
}
static void
packerClean(&p);
mb.append("\r\n", 2);
- comm_write_mbuf(tunnelState->server.fd(), &mb, tunnelProxyConnectedWriteDone, tunnelState);
+ AsyncCall::Pointer call = commCbCall(5,5, "tunnelProxyConnectedWriteDone",
+ CommIoCbPtrFun(tunnelProxyConnectedWriteDone, tunnelState));
+
+ Comm::Write(tunnelState->server.fd(), &mb, call);
commSetTimeout(tunnelState->server.fd(), Config.Timeout.read, tunnelTimeout, tunnelState);
}
*/
#include "squid.h"
+#include "comm/Write.h"
#include "errorpage.h"
#include "Store.h"
#include "HttpReply.h"
String str_print=p->request->urlpath.substr(1,p->request->urlpath.size());
snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print));
- comm_write(fd, buf, strlen(buf), whoisWriteComplete, p, NULL);
+ AsyncCall::Pointer call = commCbCall(5,5, "whoisWriteComplete",
+ CommIoCbPtrFun(whoisWriteComplete, p));
+
+ Comm::Write(fd, buf, strlen(buf), call, NULL);
comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p);
commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p);
}