]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/comm.cc
Merge from trunk. and Save Comm::Connection in IoCallback
[thirdparty/squid.git] / src / comm.cc
index 3365de51b5f8a8c1b8e2d21405b07f2e897130dc..0738f080d5d1b86f7beb3445a61d640c9270c127 100644 (file)
@@ -41,6 +41,8 @@
 #include "comm/AcceptLimiter.h"
 #include "comm/comm_internal.h"
 #include "comm/Connection.h"
+#include "comm/IoCallback.h"
+#include "comm/Write.h"
 #include "CommIO.h"
 #include "CommRead.h"
 #include "MemBuf.h"
  * New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
  */
 
-typedef enum {
-    IOCB_NONE,
-    IOCB_READ,
-    IOCB_WRITE
-} iocb_type;
-
 static void commStopHalfClosedMonitor(int fd);
 static IOCB commHalfClosedReader;
 static void comm_init_opened(const Comm::ConnectionPointer &conn, tos_t tos, nfmark_t nfmark, const char *note, struct addrinfo *AI);
@@ -84,142 +80,6 @@ CBDATA_CLASS_INIT(CommQuotaQueue);
 static void commHandleWriteHelper(void * data);
 #endif
 
-static void commSelectOrQueueWrite(const int fd);
-
-struct comm_io_callback_t {
-    iocb_type type;
-    int fd;
-    AsyncCall::Pointer callback;
-    char *buf;
-    FREE *freefunc;
-    int size;
-    int offset;
-    comm_err_t errcode;
-    int xerrno;
-#if DELAY_POOLS
-    unsigned int quotaQueueReserv; ///< reservation ID from CommQuotaQueue
-#endif
-
-
-    bool active() const { return callback != NULL; }
-};
-
-struct _comm_fd {
-    int fd;
-    comm_io_callback_t readcb;
-    comm_io_callback_t writecb;
-};
-typedef struct _comm_fd comm_fd_t;
-comm_fd_t *commfd_table;
-
-// TODO: make this a comm_io_callback_t method?
-bool
-commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
-{
-    assert(ccb->fd == fd);
-    assert(ccb->type == type);
-    return ccb->active();
-}
-
-/*
- * Configure comm_io_callback_t for I/O
- *
- * @param fd           filedescriptor
- * @param ccb          comm io callback
- * @param cb           callback
- * @param cbdata       callback data (must be cbdata'ed)
- * @param buf          buffer, if applicable
- * @param freefunc     freefunc, if applicable
- * @param size         buffer size
- */
-static void
-commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
-                    AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
-{
-    assert(!ccb->active());
-    assert(ccb->type == type);
-    assert(cb != NULL);
-    ccb->fd = fd;
-    ccb->callback = cb;
-    ccb->buf = buf;
-    ccb->freefunc = freefunc;
-    ccb->size = size;
-    ccb->offset = 0;
-}
-
-
-// Schedule the callback call and clear the callback
-static void
-commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
-{
-    debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
-           code << ", " << xerrno << ")");
-    assert(ccb->active());
-    assert(ccb->fd == fd);
-    ccb->errcode = code;
-    ccb->xerrno = xerrno;
-
-#if DELAY_POOLS
-    ccb->quotaQueueReserv = 0;
-#endif
-
-    comm_io_callback_t cb = *ccb;
-
-    /* We've got a copy; blow away the real one */
-    /* XXX duplicate code from commio_cancel_callback! */
-    ccb->xerrno = 0;
-    ccb->callback = NULL; // cb has it
-
-    /* free data */
-    if (cb.freefunc) {
-        cb.freefunc(cb.buf);
-        cb.buf = NULL;
-    }
-
-    if (cb.callback != NULL) {
-        typedef CommIoCbParams Params;
-        Params &params = GetCommParams<Params>(cb.callback);
-        params.fd = cb.fd;
-        params.buf = cb.buf;
-        params.size = cb.offset;
-        params.flag = cb.errcode;
-        params.xerrno = cb.xerrno;
-        ScheduleCallHere(cb.callback);
-    }
-}
-
-
-/*
- * Cancel the given callback
- *
- * Remember that the data is cbdataRef'ed.
- */
-// TODO: make this a comm_io_callback_t method
-static void
-commio_cancel_callback(int fd, comm_io_callback_t *ccb)
-{
-    debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
-    assert(ccb->fd == fd);
-    assert(ccb->active());
-
-    ccb->xerrno = 0;
-    ccb->callback = NULL;
-
-#if DELAY_POOLS
-    ccb->quotaQueueReserv = 0;
-#endif
-}
-
-/*
- * Call the given comm callback; assumes the callback is valid.
- *
- * @param ccb          io completion callback
- */
-void
-commio_call_callback(comm_io_callback_t *ccb)
-{
-}
-
 /* STATIC */
 
 static DescriptorSet *TheHalfClosed = NULL; /// the set of half-closed FDs
@@ -234,12 +94,13 @@ static void commSetNoLinger(int);
 static void commSetTcpNoDelay(int);
 #endif
 static void commSetTcpRcvbuf(int, int);
-static PF commHandleWrite;
 
+/*
 typedef enum {
     COMM_CB_READ = 1,
     COMM_CB_DERIVED
 } comm_callback_t;
+*/
 
 static MemAllocator *conn_close_pool = NULL;
 fd_debug_t *fdd_table = NULL;
@@ -259,10 +120,10 @@ isOpen(const int fd)
 void
 commHandleRead(int fd, void *data)
 {
-    comm_io_callback_t *ccb = (comm_io_callback_t *) data;
+    Comm::IoCallback *ccb = (Comm::IoCallback *) data;
 
     assert(data == COMMIO_FD_READCB(fd));
-    assert(commio_has_callback(fd, IOCB_READ, ccb));
+    assert(ccb->active());
     /* Attempt a read */
     statCounter.syscalls.sock.reads++;
     errno = 0;
@@ -273,7 +134,7 @@ commHandleRead(int fd, void *data)
     if (retval < 0 && !ignoreErrno(errno)) {
         debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
         ccb->offset = 0;
-        commio_finish_callback(fd, ccb, COMM_ERROR, errno);
+        ccb->finish(COMM_ERROR, errno);
         return;
     };
 
@@ -282,7 +143,7 @@ commHandleRead(int fd, void *data)
     if (retval >= 0) {
         fd_bytes(fd, retval, FD_READ);
         ccb->offset = retval;
-        commio_finish_callback(fd, ccb, COMM_OK, errno);
+        ccb->finish(COMM_OK, errno);
         return;
     }
 
@@ -310,7 +171,7 @@ comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::P
     /* Make sure we are open and not closing */
     assert(Comm::IsConnOpen(conn));
     assert(!fd_table[conn->fd].closing());
-    comm_io_callback_t *ccb = COMMIO_FD_READCB(conn->fd);
+    Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
 
     // Make sure we are either not reading or just passively monitoring.
     // Active/passive conflicts are OK and simply cancel passive monitoring.
@@ -320,9 +181,10 @@ comm_read(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::P
         commStopHalfClosedMonitor(conn->fd);
         assert(!ccb->active());
     }
+    ccb->conn = conn;
 
     /* Queue the read */
-    commio_set_callback(conn->fd, IOCB_READ, ccb, callback, (char *)buf, NULL, size);
+    ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
     commSetSelect(conn->fd, COMM_SELECT_READ, commHandleRead, ccb, 0);
 }
 
@@ -370,9 +232,9 @@ comm_monitors_read(int fd)
 {
     assert(isOpen(fd));
     // Being active is usually the same as monitoring because we always
-    // start monitoring the FD when we configure comm_io_callback_t for I/O
-    // and we usually configure comm_io_callback_t for I/O when we starting
-    // monitoring a FD for reading. TODO: replace with commio_has_callback
+    // start monitoring the FD when we configure Comm::IoCallback for I/O
+    // and we usually configure Comm::IoCallback for I/O when we starting
+    // monitoring a FD for reading.
     return COMMIO_FD_READCB(fd)->active();
 }
 
@@ -396,7 +258,7 @@ comm_read_cancel(int fd, IOCB *callback, void *data)
         return;
     }
 
-    comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
     // TODO: is "active" == "monitors FD"?
     if (!cb->active()) {
         debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
@@ -420,7 +282,7 @@ comm_read_cancel(int fd, IOCB *callback, void *data)
     assert(params.data == data);
 
     /* Delete the callback */
-    commio_cancel_callback(fd, cb);
+    cb->cancel("old comm_read_cancel");
 
     /* And the IO event */
     commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
@@ -436,7 +298,7 @@ comm_read_cancel(int fd, AsyncCall::Pointer &callback)
         return;
     }
 
-    comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
+    Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
 
     if (!cb->active()) {
         debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
@@ -450,7 +312,7 @@ comm_read_cancel(int fd, AsyncCall::Pointer &callback)
     assert(call == callback);
 
     /* Delete the callback */
-    commio_cancel_callback(fd, cb);
+    cb->cancel("comm_read_cancel");
 
     /* And the IO event */
     commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
@@ -1235,6 +1097,10 @@ comm_close_complete(int fd, void *data)
         F->ssl = NULL;
     }
 
+    if (F->dynamicSslContext) {
+        SSL_CTX_free(F->dynamicSslContext);
+        F->dynamicSslContext = NULL;
+    }
 #endif
     fd_close(fd);              /* update fdstat */
 
@@ -1296,13 +1162,13 @@ _comm_close(int fd, char const *file, int line)
     commSetTimeout(fd, -1, NULL, NULL);
 
     // notify read/write handlers after canceling select reservations, if any
-    if (commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd))) {
+    if (COMMIO_FD_WRITECB(fd)->active()) {
         commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
-        commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERR_CLOSING, errno);
+        COMMIO_FD_WRITECB(fd)->finish(COMM_ERR_CLOSING, errno);
     }
-    if (commio_has_callback(fd, IOCB_READ, COMMIO_FD_READCB(fd))) {
+    if (COMMIO_FD_READCB(fd)->active()) {
         commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
-        commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
+        COMMIO_FD_READCB(fd)->finish(COMM_ERR_CLOSING, errno);
     }
 
 #if DELAY_POOLS
@@ -1630,14 +1496,8 @@ comm_init(void)
     /* make sure the accept() socket FIFO delay queue exists */
     Comm::AcceptLimiter::Instance();
 
-    commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
-    for (int pos = 0; pos < Squid_MaxFD; pos++) {
-        commfd_table[pos].fd = pos;
-        commfd_table[pos].readcb.fd = pos;
-        commfd_table[pos].readcb.type = IOCB_READ;
-        commfd_table[pos].writecb.fd = pos;
-        commfd_table[pos].writecb.type = IOCB_WRITE;
-    }
+    // make sure the IO pending callback table exists
+    Comm::CallbackTableInit();
 
     /* XXX account fd_table */
     /* Keep a few file descriptors free so that we don't run out of FD's
@@ -1658,12 +1518,12 @@ comm_exit(void)
 
     safe_free(fd_table);
     safe_free(fdd_table);
-    safe_free(commfd_table);
+    Comm::CallbackTableDestruct();
 }
 
 #if DELAY_POOLS
 // called when the queue is done waiting for the client bucket to fill
-static void
+void
 commHandleWriteHelper(void * data)
 {
     CommQuotaQueue *queue = static_cast<CommQuotaQueue*>(data);
@@ -1682,14 +1542,14 @@ commHandleWriteHelper(void * data)
     do {
         // check that the head descriptor is still relevant
         const int head = clientInfo->quotaPeekFd();
-        comm_io_callback_t *ccb = COMMIO_FD_WRITECB(head);
+        Comm::IoCallback *ccb = COMMIO_FD_WRITECB(head);
 
         if (fd_table[head].clientInfo == clientInfo &&
                 clientInfo->quotaPeekReserv() == ccb->quotaQueueReserv &&
                 !fd_table[head].closing()) {
 
             // wait for the head descriptor to become ready for writing
-            commSetSelect(head, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+            commSetSelect(head, COMM_SELECT_WRITE, Comm::HandleWrite, ccb, 0);
             clientInfo->selectWaiting = true;
             return;
         }
@@ -1886,204 +1746,8 @@ CommQuotaQueue::dequeue()
     fds.pop_front();
     ++outs;
 }
-
 #endif
 
-/* Write to FD. */
-static void
-commHandleWrite(int fd, void *data)
-{
-    comm_io_callback_t *state = (comm_io_callback_t *)data;
-    int len = 0;
-    int nleft;
-
-    assert(state == COMMIO_FD_WRITECB(fd));
-
-    PROF_start(commHandleWrite);
-    debugs(5, 5, "commHandleWrite: FD " << fd << ": off " <<
-           (long int) state->offset << ", sz " << (long int) state->size << ".");
-
-    nleft = state->size - state->offset;
-
-#if DELAY_POOLS
-    ClientInfo * clientInfo=fd_table[fd].clientInfo;
-
-    if (clientInfo && !clientInfo->writeLimitingActive)
-        clientInfo = NULL; // we only care about quota limits here
-
-    if (clientInfo) {
-        assert(clientInfo->selectWaiting);
-        clientInfo->selectWaiting = false;
-
-        assert(clientInfo->hasQueue());
-        assert(clientInfo->quotaPeekFd() == fd);
-        clientInfo->quotaDequeue(); // we will write or requeue below
-
-        if (nleft > 0) {
-            const int quota = clientInfo->quotaForDequed();
-            if (!quota) {  // if no write quota left, queue this fd
-                state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-                clientInfo->kickQuotaQueue();
-                PROF_stop(commHandleWrite);
-                return;
-            }
-
-            const int nleft_corrected = min(nleft, quota);
-            if (nleft != nleft_corrected) {
-                debugs(5, 5, HERE << "FD " << fd << " writes only " <<
-                       nleft_corrected << " out of " << nleft);
-                nleft = nleft_corrected;
-            }
-
-        }
-    }
-
-#endif
-
-    /* actually WRITE data */
-    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
-    debugs(5, 5, "commHandleWrite: write() returns " << len);
-
-#if DELAY_POOLS
-    if (clientInfo) {
-        if (len > 0) {
-            /* we wrote data - drain them from bucket */
-            clientInfo->bucketSize -= len;
-            if (clientInfo->bucketSize < 0.0) {
-                debugs(5,1, HERE << "drained too much"); // should not happen
-                clientInfo->bucketSize = 0;
-            }
-        }
-
-        // even if we wrote nothing, we were served; give others a chance
-        clientInfo->kickQuotaQueue();
-    }
-#endif
-
-    fd_bytes(fd, len, FD_WRITE);
-    statCounter.syscalls.sock.writes++;
-    // After each successful partial write,
-    // reset fde::writeStart to the current time.
-    fd_table[fd].writeStart = squid_curtime;
-
-    if (len == 0) {
-        /* Note we even call write if nleft == 0 */
-        /* We're done */
-
-        if (nleft != 0)
-            debugs(5, 1, "commHandleWrite: FD " << fd << ": write failure: connection closed with " << nleft << " bytes remaining.");
-
-        commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
-    } else if (len < 0) {
-        /* An error */
-
-        if (fd_table[fd].flags.socket_eof) {
-            debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
-        } else if (ignoreErrno(errno)) {
-            debugs(50, 10, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commSelectOrQueueWrite(fd);
-        } else {
-            debugs(50, 2, "commHandleWrite: FD " << fd << ": write failure: " << xstrerror() << ".");
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_ERROR : COMM_OK, errno);
-        }
-    } else {
-        /* A successful write, continue */
-        state->offset += len;
-
-        if (state->offset < state->size) {
-            /* Not done, reinstall the write handler and write some more */
-            commSelectOrQueueWrite(fd);
-        } else {
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), nleft ? COMM_OK : COMM_ERROR, errno);
-        }
-    }
-
-    PROF_stop(commHandleWrite);
-}
-
-/*
- * Queue a write. handler/handler_data are called when the write
- * completes, on error, or on file descriptor close.
- *
- * free_func is used to free the passed buffer when the write has completed.
- */
-void
-comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
-{
-    AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
-                                         CommIoCbPtrFun(handler, handler_data));
-
-    comm_write(conn, buf, size, call, free_func);
-}
-
-void
-comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
-{
-    debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
-
-    /* Make sure we are open, not closing, and not writing */
-    assert(Comm::IsConnOpen(conn));
-    assert(!fd_table[conn->fd].closing());
-    comm_io_callback_t *ccb = COMMIO_FD_WRITECB(conn->fd);
-    assert(!ccb->active());
-
-    fd_table[conn->fd].writeStart = squid_curtime;
-    /* Queue the write */
-    commio_set_callback(conn->fd, IOCB_WRITE, ccb, callback,
-                        (char *)buf, free_func, size);
-    commSelectOrQueueWrite(conn->fd);
-}
-
-// called when fd needs to write but may need to wait in line for its quota
-static void
-commSelectOrQueueWrite(const int fd)
-{
-    comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
-
-#if DELAY_POOLS
-    // stand in line if there is one
-    if (ClientInfo *clientInfo = fd_table[fd].clientInfo) {
-        if (clientInfo->writeLimitingActive) {
-            ccb->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
-            clientInfo->kickQuotaQueue();
-            return;
-        }
-    }
-#endif
-
-    commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
-}
-
-
-/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
-#if 0
-void
-comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data)
-{
-    comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
-}
-
-void
-comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
-{
-    comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
-}
-#endif
-
-void
-comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, IOCB * handler, void *handler_data)
-{
-    comm_write(conn, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
-}
-
-void
-comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
-{
-    comm_write(conn, mb->buf, mb->size, callback, mb->freeFunc());
-}
-
-
 /*
  * hm, this might be too general-purpose for all the places we'd
  * like to use it.
@@ -2166,7 +1830,7 @@ AlreadyTimedOut(fde *F)
 static bool
 writeTimedOut(int fd)
 {
-    if (!commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd)))
+    if (!COMMIO_FD_WRITECB(fd)->active())
         return false;
 
     if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
@@ -2189,7 +1853,7 @@ checkTimeouts(void)
             // We have an active write callback and we are timed out
             debugs(5, 5, "checkTimeouts: FD " << fd << " auto write timeout");
             commSetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0);
-            commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERROR, ETIMEDOUT);
+            COMMIO_FD_WRITECB(fd)->finish(COMM_ERROR, ETIMEDOUT);
         } else if (AlreadyTimedOut(F))
             continue;