CBDATA_CLASS_INIT(ClientSocketContext);
/* Local functions */
-static IOCB clientWriteComplete;
-static IOCB clientWriteBodyComplete;
static IOACB httpAccept;
#if USE_OPENSSL
static IOACB httpsAccept;
if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
size_t length = lengthToSend(bodyData.range());
noteSentBodyBytes (length);
- AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
- CommIoCbPtrFun(clientWriteBodyComplete, this));
- Comm::Write(clientConnection, bodyData.data, length, call, NULL);
+ getConn()->write(bodyData.data, length);
return;
}
else
packChunk(bodyData, mb);
- if (mb.contentSize()) {
- /* write */
- AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
- CommIoCbPtrFun(clientWriteComplete, this));
- Comm::Write(clientConnection, &mb, call);
- } else
- writeComplete(clientConnection, NULL, 0, Comm::OK);
+ if (mb.contentSize())
+ getConn()->write(&mb);
+ else
+ writeComplete(0);
}
/**
}
}
- /* write */
- debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
- AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
- CommIoCbPtrFun(clientWriteComplete, this));
- Comm::Write(clientConnection, mb, call);
+ getConn()->write(mb);
delete mb;
}
clientStreamDetach(node, http);
}
-static void
-clientWriteBodyComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag errflag, int xerrno, void *data)
-{
- debugs(33,7, "schedule clientWriteComplete");
- clientWriteComplete(conn, NULL, size, errflag, xerrno, data);
-}
-
void
ConnStateData::readNextRequest()
{
return STREAM_NONE;
}
-/**
- * A write has just completed to the client, or we have just realised there is
- * no more data to send.
- */
-void
-clientWriteComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int, void *data)
-{
- ClientSocketContext *context = (ClientSocketContext *)data;
- context->writeComplete(conn, bufnotused, size, errflag);
-}
-
/// remembers the abnormal connection termination for logging purposes
void
ClientSocketContext::noteIoError(const int xerrno)
}
void
-ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag errflag)
+ConnStateData::afterClientWrite(size_t size)
+{
+ if (pipeline.empty())
+ return;
+
+ pipeline.front()->writeComplete(size);
+}
+
+// TODO: make this only need size parameter, ConnStateData handles the rest
+void
+ClientSocketContext::writeComplete(size_t size)
{
const StoreEntry *entry = http->storeEntry();
- http->out.size += size;
- debugs(33, 5, HERE << conn << ", sz " << size <<
- ", err " << errflag << ", off " << http->out.size << ", len " <<
+ debugs(33, 5, clientConnection << ", sz " << size <<
+ ", off " << (http->out.size + size) << ", len " <<
(entry ? entry->objectLen() : 0));
- clientUpdateSocketStats(http->logType, size);
-
- /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
- if (errflag == Comm::ERR_CLOSING || !Comm::IsConnOpen(conn))
- return;
+ http->out.size += size;
+ clientUpdateSocketStats(http->logType, size);
- if (errflag || clientHttpRequestStatus(conn->fd, http)) {
+ if (clientHttpRequestStatus(clientConnection->fd, http)) {
initiateClose("failure or true request status");
/* Do we leak here ? */
return;
break;
case STREAM_COMPLETE: {
- debugs(33, 5, conn << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
+ debugs(33, 5, clientConnection << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
ConnStateData *c = http->getConn();
if (!http->request->flags.proxyKeepalive)
clientConnection->close();
return;
default:
- fatal("Hit unreachable code in clientWriteComplete\n");
+ fatal("Hit unreachable code in ClientSocketContext::writeComplete\n");
}
}
ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
~ClientSocketContext();
bool startOfOutput() const;
- void writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag);
+ void writeComplete(size_t size);
Comm::ConnectionPointer clientConnection; /// details about the client connection socket.
ClientHttpRequest *http; /* we pretend to own that job */
bool multipartRangeRequest() const;
void registerWithConn();
void noteIoError(const int xerrno); ///< update state to reflect I/O error
+ void initiateClose(const char *reason); ///< terminate due to a send/write error (may continue reading)
private:
void prepareReply(HttpReply * rep);
void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb);
void packRange(StoreIOBuffer const &, MemBuf * mb);
void doClose();
- void initiateClose(const char *reason);
bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
bool connRegistered_;
virtual void receivedFirstByte();
virtual bool handleReadData();
virtual void afterClientRead();
+ virtual void afterClientWrite(size_t);
/* HttpControlMsgSink API */
virtual void sendControlMsg(HttpControlMsg);
!receivedData.data &&
!receivedData.length;
if (responseFinishedOrFailed && !mustSendLastChunk) {
- context->writeComplete(context->clientConnection, NULL, 0, Comm::OK);
+ context->writeComplete(0);
return;
}
afterClientRead();
}
+/** callback handling the Comm::Write completion
+ *
+ * Will call afterClientWrite(size_t) to sync the I/O state.
+ * Then writeSomeData() to initiate any followup writes that
+ * could be immediately done.
+ */
void
Server::clientWriteDone(const CommIoCbParams &io)
{
debugs(33,5, io.conn);
- Must(writer != NULL);
- writer = NULL;
+ Must(writer != nullptr);
+ writer = nullptr;
/* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
- if (io.flag == Comm::ERR_CLOSING) {
+ if (io.flag == Comm::ERR_CLOSING || !Comm::IsConnOpen(clientConnection)) {
debugs(33,5, io.conn << " closing Bailout.");
return;
}
- assert(Comm::IsConnOpen(clientConnection));
- assert(io.conn->fd == clientConnection->fd);
+ Must(io.conn->fd == clientConnection->fd);
+
+ if (io.flag && pipeline.front())
+ pipeline.front()->initiateClose("write failure");
+ afterClientWrite(io.size); // update state
writeSomeData(); // maybe schedules another write
}
#include "anyp/ProtocolVersion.h"
#include "base/AsyncJob.h"
#include "BodyPipe.h"
-#include "comm/forward.h"
+#include "comm/Write.h"
#include "CommCalls.h"
#include "Pipeline.h"
#include "SBuf.h"
/// ??
virtual bool connFinishedWithConn(int size) = 0;
- /// processing to be done after a Comm::Read()
- virtual void afterClientRead() = 0;
-
/// maybe grow the inBuf and schedule Comm::Read()
void readSomeData();
*/
virtual bool handleReadData() = 0;
+ /// processing to be done after a Comm::Read()
+ virtual void afterClientRead() = 0;
+
/// whether Comm::Read() is scheduled
bool reading() const {return reader != NULL;}
/// Update flags and timeout after the first byte received
virtual void receivedFirstByte() = 0;
- /// maybe schedule another Comm::Write() and perform any
- /// processing to be done after previous Comm::Write() completes
+ /// maybe find some data to send and schedule a Comm::Write()
virtual void writeSomeData() {}
+ /// schedule some data for a Comm::Write()
+ void write(MemBuf *mb) {
+ typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
+ writer = JobCallback(33, 5, Dialer, this, Server::clientWriteDone);
+ Comm::Write(clientConnection, mb, writer);
+ }
+
+ /// schedule some data for a Comm::Write()
+ void write(char *buf, int len) {
+ typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
+ writer = JobCallback(33, 5, Dialer, this, Server::clientWriteDone);
+ Comm::Write(clientConnection, buf, len, writer, nullptr);
+ }
+
+ /// processing to sync state after a Comm::Write()
+ virtual void afterClientWrite(size_t) {}
+
/// whether Comm::Write() is scheduled
bool writing() const {return writer != NULL;}
//ClientSocketContext::ClientSocketContext(const ConnectionPointer&, ClientHttpRequest*) STUB
//ClientSocketContext::~ClientSocketContext() STUB
bool ClientSocketContext::startOfOutput() const STUB_RETVAL(false)
-void ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag) STUB
+void ClientSocketContext::writeComplete(size_t size) STUB
void ClientSocketContext::pullData() STUB
int64_t ClientSocketContext::getNextRangeOffset() const STUB_RETVAL(0)
bool ClientSocketContext::canPackMoreRanges() const STUB_RETVAL(false)