]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Refactor ClientSocketContext write(2) using Server:: write methods
authorAmos Jeffries <squid3@treenet.co.nz>
Tue, 8 Dec 2015 18:47:25 +0000 (10:47 -0800)
committerAmos Jeffries <squid3@treenet.co.nz>
Tue, 8 Dec 2015 18:47:25 +0000 (10:47 -0800)
Writing to the client connection is scoped as an action for class Server
and its child classes. There is no need for ClientSocketContext to be
providing the callback handlers and performing I/O error handling.

With Server providing the current write handler we can move from
CBDATA callbacks to AsyncCall. Initial testing indicates this has some
minor performance benefit.

src/client_side.cc
src/client_side.h
src/servers/Http1Server.cc
src/servers/Server.cc
src/servers/Server.h
src/tests/stub_client_side.cc

index f8ec73c15831aa4bcb84b7a11f425ebccb7fc5d6..c377c0c98cbef70f38585613cd4f93067a3d5bcd 100644 (file)
@@ -176,8 +176,6 @@ static void clientListenerConnectionOpened(AnyP::PortCfgPointer &s, const Ipc::F
 CBDATA_CLASS_INIT(ClientSocketContext);
 
 /* Local functions */
-static IOCB clientWriteComplete;
-static IOCB clientWriteBodyComplete;
 static IOACB httpAccept;
 #if USE_OPENSSL
 static IOACB httpsAccept;
@@ -851,9 +849,7 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
     if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
         size_t length = lengthToSend(bodyData.range());
         noteSentBodyBytes (length);
-        AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
-                                             CommIoCbPtrFun(clientWriteBodyComplete, this));
-        Comm::Write(clientConnection, bodyData.data, length, call, NULL);
+        getConn()->write(bodyData.data, length);
         return;
     }
 
@@ -864,13 +860,10 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
     else
         packChunk(bodyData, mb);
 
-    if (mb.contentSize()) {
-        /* write */
-        AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
-                                             CommIoCbPtrFun(clientWriteComplete, this));
-        Comm::Write(clientConnection, &mb, call);
-    }  else
-        writeComplete(clientConnection, NULL, 0, Comm::OK);
+    if (mb.contentSize())
+        getConn()->write(&mb);
+    else
+        writeComplete(0);
 }
 
 /**
@@ -1257,11 +1250,7 @@ ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData)
         }
     }
 
-    /* write */
-    debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
-    AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
-                                         CommIoCbPtrFun(clientWriteComplete, this));
-    Comm::Write(clientConnection, mb, call);
+    getConn()->write(mb);
     delete mb;
 }
 
@@ -1332,13 +1321,6 @@ clientSocketDetach(clientStreamNode * node, ClientHttpRequest * http)
     clientStreamDetach(node, http);
 }
 
-static void
-clientWriteBodyComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag errflag, int xerrno, void *data)
-{
-    debugs(33,7, "schedule clientWriteComplete");
-    clientWriteComplete(conn, NULL, size, errflag, xerrno, data);
-}
-
 void
 ConnStateData::readNextRequest()
 {
@@ -1619,17 +1601,6 @@ ClientSocketContext::socketState()
     return STREAM_NONE;
 }
 
-/**
- * A write has just completed to the client, or we have just realised there is
- * no more data to send.
- */
-void
-clientWriteComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int, void *data)
-{
-    ClientSocketContext *context = (ClientSocketContext *)data;
-    context->writeComplete(conn, bufnotused, size, errflag);
-}
-
 /// remembers the abnormal connection termination for logging purposes
 void
 ClientSocketContext::noteIoError(const int xerrno)
@@ -1680,21 +1651,27 @@ ConnStateData::stopSending(const char *error)
 }
 
 void
-ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag errflag)
+ConnStateData::afterClientWrite(size_t size)
+{
+    if (pipeline.empty())
+        return;
+
+    pipeline.front()->writeComplete(size);
+}
+
+// TODO: make this only need size parameter, ConnStateData handles the rest
+void
+ClientSocketContext::writeComplete(size_t size)
 {
     const StoreEntry *entry = http->storeEntry();
-    http->out.size += size;
-    debugs(33, 5, HERE << conn << ", sz " << size <<
-           ", err " << errflag << ", off " << http->out.size << ", len " <<
+    debugs(33, 5, clientConnection << ", sz " << size <<
+           ", off " << (http->out.size + size) << ", len " <<
            (entry ? entry->objectLen() : 0));
-    clientUpdateSocketStats(http->logType, size);
-
-    /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
 
-    if (errflag == Comm::ERR_CLOSING || !Comm::IsConnOpen(conn))
-        return;
+    http->out.size += size;
+    clientUpdateSocketStats(http->logType, size);
 
-    if (errflag || clientHttpRequestStatus(conn->fd, http)) {
+    if (clientHttpRequestStatus(clientConnection->fd, http)) {
         initiateClose("failure or true request status");
         /* Do we leak here ? */
         return;
@@ -1707,7 +1684,7 @@ ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *,
         break;
 
     case STREAM_COMPLETE: {
-        debugs(33, 5, conn << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
+        debugs(33, 5, clientConnection << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
         ConnStateData *c = http->getConn();
         if (!http->request->flags.proxyKeepalive)
             clientConnection->close();
@@ -1725,7 +1702,7 @@ ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *,
         return;
 
     default:
-        fatal("Hit unreachable code in clientWriteComplete\n");
+        fatal("Hit unreachable code in ClientSocketContext::writeComplete\n");
     }
 }
 
index c6f6a040a5ff1e7492973828cd690fea8c1593f0..5cfd05da97769cf86cef9359b6bed41e00db93da 100644 (file)
@@ -90,7 +90,7 @@ public:
     ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
     ~ClientSocketContext();
     bool startOfOutput() const;
-    void writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag);
+    void writeComplete(size_t size);
 
     Comm::ConnectionPointer clientConnection; /// details about the client connection socket.
     ClientHttpRequest *http;    /* we pretend to own that job */
@@ -138,13 +138,13 @@ public:
     bool multipartRangeRequest() const;
     void registerWithConn();
     void noteIoError(const int xerrno); ///< update state to reflect I/O error
+    void initiateClose(const char *reason); ///< terminate due to a send/write error (may continue reading)
 
 private:
     void prepareReply(HttpReply * rep);
     void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb);
     void packRange(StoreIOBuffer const &, MemBuf * mb);
     void doClose();
-    void initiateClose(const char *reason);
 
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
@@ -193,6 +193,7 @@ public:
     virtual void receivedFirstByte();
     virtual bool handleReadData();
     virtual void afterClientRead();
+    virtual void afterClientWrite(size_t);
 
     /* HttpControlMsgSink API */
     virtual void sendControlMsg(HttpControlMsg);
index 3b17df8290506e8de35c6665f1d1de9b9c703238..ac4ca7d73ba4a683379098ded9f58fe9ff064f2c 100644 (file)
@@ -257,7 +257,7 @@ Http::One::Server::handleReply(HttpReply *rep, StoreIOBuffer receivedData)
                                           !receivedData.data &&
                                           !receivedData.length;
     if (responseFinishedOrFailed && !mustSendLastChunk) {
-        context->writeComplete(context->clientConnection, NULL, 0, Comm::OK);
+        context->writeComplete(0);
         return;
     }
 
index 821fe69ea535e3b3011b5b40928dcdb028ec0b95..e60911f8d977001e2197ea8377f63b1d1edd0605 100644 (file)
@@ -176,22 +176,31 @@ Server::doClientRead(const CommIoCbParams &io)
     afterClientRead();
 }
 
+/** callback handling the Comm::Write completion
+ *
+ * Will call afterClientWrite(size_t) to sync the I/O state.
+ * Then writeSomeData() to initiate any followup writes that
+ * could be immediately done.
+ */
 void
 Server::clientWriteDone(const CommIoCbParams &io)
 {
     debugs(33,5, io.conn);
-    Must(writer != NULL);
-    writer = NULL;
+    Must(writer != nullptr);
+    writer = nullptr;
 
     /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
-    if (io.flag == Comm::ERR_CLOSING) {
+    if (io.flag == Comm::ERR_CLOSING || !Comm::IsConnOpen(clientConnection)) {
         debugs(33,5, io.conn << " closing Bailout.");
         return;
     }
 
-    assert(Comm::IsConnOpen(clientConnection));
-    assert(io.conn->fd == clientConnection->fd);
+    Must(io.conn->fd == clientConnection->fd);
+
+    if (io.flag && pipeline.front())
+        pipeline.front()->initiateClose("write failure");
 
+    afterClientWrite(io.size); // update state
     writeSomeData(); // maybe schedules another write
 }
 
index e7faf9b3233620b6ae0227d1b0073f6434aa2b24..7f6afb497e0fac82ed0352f834b6160eb81713e5 100644 (file)
@@ -15,7 +15,7 @@
 #include "anyp/ProtocolVersion.h"
 #include "base/AsyncJob.h"
 #include "BodyPipe.h"
-#include "comm/forward.h"
+#include "comm/Write.h"
 #include "CommCalls.h"
 #include "Pipeline.h"
 #include "SBuf.h"
@@ -38,9 +38,6 @@ public:
     /// ??
     virtual bool connFinishedWithConn(int size) = 0;
 
-    /// processing to be done after a Comm::Read()
-    virtual void afterClientRead() = 0;
-
     /// maybe grow the inBuf and schedule Comm::Read()
     void readSomeData();
 
@@ -52,6 +49,9 @@ public:
      */
     virtual bool handleReadData() = 0;
 
+    /// processing to be done after a Comm::Read()
+    virtual void afterClientRead() = 0;
+
     /// whether Comm::Read() is scheduled
     bool reading() const {return reader != NULL;}
 
@@ -61,10 +61,26 @@ public:
     /// Update flags and timeout after the first byte received
     virtual void receivedFirstByte() = 0;
 
-    /// maybe schedule another Comm::Write() and perform any
-    /// processing to be done after previous Comm::Write() completes
+    /// maybe find some data to send and schedule a Comm::Write()
     virtual void writeSomeData() {}
 
+    /// schedule some data for a Comm::Write()
+    void write(MemBuf *mb) {
+        typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
+        writer = JobCallback(33, 5, Dialer, this, Server::clientWriteDone);
+        Comm::Write(clientConnection, mb, writer);
+    }
+
+    /// schedule some data for a Comm::Write()
+    void write(char *buf, int len) {
+        typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
+        writer = JobCallback(33, 5, Dialer, this, Server::clientWriteDone);
+        Comm::Write(clientConnection, buf, len, writer, nullptr);
+    }
+
+    /// processing to sync state after a Comm::Write()
+    virtual void afterClientWrite(size_t) {}
+
     /// whether Comm::Write() is scheduled
     bool writing() const {return writer != NULL;}
 
index f143ab110e4409a8eea37137ef489bfc7dc0124a..27e92a06b64cb251bd8b6e2101c61ce56f6cbc0b 100644 (file)
@@ -15,7 +15,7 @@
 //ClientSocketContext::ClientSocketContext(const ConnectionPointer&, ClientHttpRequest*) STUB
 //ClientSocketContext::~ClientSocketContext() STUB
 bool ClientSocketContext::startOfOutput() const STUB_RETVAL(false)
-void ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag) STUB
+void ClientSocketContext::writeComplete(size_t size) STUB
 void ClientSocketContext::pullData() STUB
 int64_t ClientSocketContext::getNextRangeOffset() const STUB_RETVAL(0)
 bool ClientSocketContext::canPackMoreRanges() const STUB_RETVAL(false)