From: Amos Jeffries Date: Tue, 8 Dec 2015 18:47:25 +0000 (-0800) Subject: Refactor ClientSocketContext write(2) using Server:: write methods X-Git-Tag: SQUID_4_0_4~45 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=21cd322;p=thirdparty%2Fsquid.git Refactor ClientSocketContext write(2) using Server:: write methods Writing to the client connection is scoped as an action for class Server and its child classes. There is no need for ClientSocketContext to be providing the callback handlers and performing I/O error handling. With Server providing the current write handler we can move from CBDATA callbacks to AsyncCall. Initial testing indicates this has some minor performance benefit. --- diff --git a/src/client_side.cc b/src/client_side.cc index f8ec73c158..c377c0c98c 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -176,8 +176,6 @@ static void clientListenerConnectionOpened(AnyP::PortCfgPointer &s, const Ipc::F CBDATA_CLASS_INIT(ClientSocketContext); /* Local functions */ -static IOCB clientWriteComplete; -static IOCB clientWriteBodyComplete; static IOACB httpAccept; #if USE_OPENSSL static IOACB httpsAccept; @@ -851,9 +849,7 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData) if (!multipartRangeRequest() && !http->request->flags.chunkedReply) { size_t length = lengthToSend(bodyData.range()); noteSentBodyBytes (length); - AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete", - CommIoCbPtrFun(clientWriteBodyComplete, this)); - Comm::Write(clientConnection, bodyData.data, length, call, NULL); + getConn()->write(bodyData.data, length); return; } @@ -864,13 +860,10 @@ ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData) else packChunk(bodyData, mb); - if (mb.contentSize()) { - /* write */ - AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", - CommIoCbPtrFun(clientWriteComplete, this)); - Comm::Write(clientConnection, &mb, call); - } else - writeComplete(clientConnection, NULL, 0, Comm::OK); + if (mb.contentSize()) + getConn()->write(&mb); + else + writeComplete(0); } /** @@ -1257,11 +1250,7 @@ ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData) } } - /* write */ - debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete"); - AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete", - CommIoCbPtrFun(clientWriteComplete, this)); - Comm::Write(clientConnection, mb, call); + getConn()->write(mb); delete mb; } @@ -1332,13 +1321,6 @@ clientSocketDetach(clientStreamNode * node, ClientHttpRequest * http) clientStreamDetach(node, http); } -static void -clientWriteBodyComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag errflag, int xerrno, void *data) -{ - debugs(33,7, "schedule clientWriteComplete"); - clientWriteComplete(conn, NULL, size, errflag, xerrno, data); -} - void ConnStateData::readNextRequest() { @@ -1619,17 +1601,6 @@ ClientSocketContext::socketState() return STREAM_NONE; } -/** - * A write has just completed to the client, or we have just realised there is - * no more data to send. - */ -void -clientWriteComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int, void *data) -{ - ClientSocketContext *context = (ClientSocketContext *)data; - context->writeComplete(conn, bufnotused, size, errflag); -} - /// remembers the abnormal connection termination for logging purposes void ClientSocketContext::noteIoError(const int xerrno) @@ -1680,21 +1651,27 @@ ConnStateData::stopSending(const char *error) } void -ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag errflag) +ConnStateData::afterClientWrite(size_t size) +{ + if (pipeline.empty()) + return; + + pipeline.front()->writeComplete(size); +} + +// TODO: make this only need size parameter, ConnStateData handles the rest +void +ClientSocketContext::writeComplete(size_t size) { const StoreEntry *entry = http->storeEntry(); - http->out.size += size; - debugs(33, 5, HERE << conn << ", sz " << size << - ", err " << errflag << ", off " << http->out.size << ", len " << + debugs(33, 5, clientConnection << ", sz " << size << + ", off " << (http->out.size + size) << ", len " << (entry ? entry->objectLen() : 0)); - clientUpdateSocketStats(http->logType, size); - - /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */ - if (errflag == Comm::ERR_CLOSING || !Comm::IsConnOpen(conn)) - return; + http->out.size += size; + clientUpdateSocketStats(http->logType, size); - if (errflag || clientHttpRequestStatus(conn->fd, http)) { + if (clientHttpRequestStatus(clientConnection->fd, http)) { initiateClose("failure or true request status"); /* Do we leak here ? */ return; @@ -1707,7 +1684,7 @@ ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *, break; case STREAM_COMPLETE: { - debugs(33, 5, conn << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive); + debugs(33, 5, clientConnection << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive); ConnStateData *c = http->getConn(); if (!http->request->flags.proxyKeepalive) clientConnection->close(); @@ -1725,7 +1702,7 @@ ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *, return; default: - fatal("Hit unreachable code in clientWriteComplete\n"); + fatal("Hit unreachable code in ClientSocketContext::writeComplete\n"); } } diff --git a/src/client_side.h b/src/client_side.h index c6f6a040a5..5cfd05da97 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -90,7 +90,7 @@ public: ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq); ~ClientSocketContext(); bool startOfOutput() const; - void writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag); + void writeComplete(size_t size); Comm::ConnectionPointer clientConnection; /// details about the client connection socket. ClientHttpRequest *http; /* we pretend to own that job */ @@ -138,13 +138,13 @@ public: bool multipartRangeRequest() const; void registerWithConn(); void noteIoError(const int xerrno); ///< update state to reflect I/O error + void initiateClose(const char *reason); ///< terminate due to a send/write error (may continue reading) private: void prepareReply(HttpReply * rep); void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb); void packRange(StoreIOBuffer const &, MemBuf * mb); void doClose(); - void initiateClose(const char *reason); bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */ bool connRegistered_; @@ -193,6 +193,7 @@ public: virtual void receivedFirstByte(); virtual bool handleReadData(); virtual void afterClientRead(); + virtual void afterClientWrite(size_t); /* HttpControlMsgSink API */ virtual void sendControlMsg(HttpControlMsg); diff --git a/src/servers/Http1Server.cc b/src/servers/Http1Server.cc index 3b17df8290..ac4ca7d73b 100644 --- a/src/servers/Http1Server.cc +++ b/src/servers/Http1Server.cc @@ -257,7 +257,7 @@ Http::One::Server::handleReply(HttpReply *rep, StoreIOBuffer receivedData) !receivedData.data && !receivedData.length; if (responseFinishedOrFailed && !mustSendLastChunk) { - context->writeComplete(context->clientConnection, NULL, 0, Comm::OK); + context->writeComplete(0); return; } diff --git a/src/servers/Server.cc b/src/servers/Server.cc index 821fe69ea5..e60911f8d9 100644 --- a/src/servers/Server.cc +++ b/src/servers/Server.cc @@ -176,22 +176,31 @@ Server::doClientRead(const CommIoCbParams &io) afterClientRead(); } +/** callback handling the Comm::Write completion + * + * Will call afterClientWrite(size_t) to sync the I/O state. + * Then writeSomeData() to initiate any followup writes that + * could be immediately done. + */ void Server::clientWriteDone(const CommIoCbParams &io) { debugs(33,5, io.conn); - Must(writer != NULL); - writer = NULL; + Must(writer != nullptr); + writer = nullptr; /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */ - if (io.flag == Comm::ERR_CLOSING) { + if (io.flag == Comm::ERR_CLOSING || !Comm::IsConnOpen(clientConnection)) { debugs(33,5, io.conn << " closing Bailout."); return; } - assert(Comm::IsConnOpen(clientConnection)); - assert(io.conn->fd == clientConnection->fd); + Must(io.conn->fd == clientConnection->fd); + + if (io.flag && pipeline.front()) + pipeline.front()->initiateClose("write failure"); + afterClientWrite(io.size); // update state writeSomeData(); // maybe schedules another write } diff --git a/src/servers/Server.h b/src/servers/Server.h index e7faf9b323..7f6afb497e 100644 --- a/src/servers/Server.h +++ b/src/servers/Server.h @@ -15,7 +15,7 @@ #include "anyp/ProtocolVersion.h" #include "base/AsyncJob.h" #include "BodyPipe.h" -#include "comm/forward.h" +#include "comm/Write.h" #include "CommCalls.h" #include "Pipeline.h" #include "SBuf.h" @@ -38,9 +38,6 @@ public: /// ?? virtual bool connFinishedWithConn(int size) = 0; - /// processing to be done after a Comm::Read() - virtual void afterClientRead() = 0; - /// maybe grow the inBuf and schedule Comm::Read() void readSomeData(); @@ -52,6 +49,9 @@ public: */ virtual bool handleReadData() = 0; + /// processing to be done after a Comm::Read() + virtual void afterClientRead() = 0; + /// whether Comm::Read() is scheduled bool reading() const {return reader != NULL;} @@ -61,10 +61,26 @@ public: /// Update flags and timeout after the first byte received virtual void receivedFirstByte() = 0; - /// maybe schedule another Comm::Write() and perform any - /// processing to be done after previous Comm::Write() completes + /// maybe find some data to send and schedule a Comm::Write() virtual void writeSomeData() {} + /// schedule some data for a Comm::Write() + void write(MemBuf *mb) { + typedef CommCbMemFunT Dialer; + writer = JobCallback(33, 5, Dialer, this, Server::clientWriteDone); + Comm::Write(clientConnection, mb, writer); + } + + /// schedule some data for a Comm::Write() + void write(char *buf, int len) { + typedef CommCbMemFunT Dialer; + writer = JobCallback(33, 5, Dialer, this, Server::clientWriteDone); + Comm::Write(clientConnection, buf, len, writer, nullptr); + } + + /// processing to sync state after a Comm::Write() + virtual void afterClientWrite(size_t) {} + /// whether Comm::Write() is scheduled bool writing() const {return writer != NULL;} diff --git a/src/tests/stub_client_side.cc b/src/tests/stub_client_side.cc index f143ab110e..27e92a06b6 100644 --- a/src/tests/stub_client_side.cc +++ b/src/tests/stub_client_side.cc @@ -15,7 +15,7 @@ //ClientSocketContext::ClientSocketContext(const ConnectionPointer&, ClientHttpRequest*) STUB //ClientSocketContext::~ClientSocketContext() STUB bool ClientSocketContext::startOfOutput() const STUB_RETVAL(false) -void ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag) STUB +void ClientSocketContext::writeComplete(size_t size) STUB void ClientSocketContext::pullData() STUB int64_t ClientSocketContext::getNextRangeOffset() const STUB_RETVAL(0) bool ClientSocketContext::canPackMoreRanges() const STUB_RETVAL(false)