]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Split core Server operations from ConnStateData
authorAmos Jeffries <squid3@treenet.co.nz>
Sat, 7 Nov 2015 12:08:33 +0000 (04:08 -0800)
committerAmos Jeffries <squid3@treenet.co.nz>
Sat, 7 Nov 2015 12:08:33 +0000 (04:08 -0800)
This improves the servers/libserver.la class hierarchy in
preparation for HTTP/2 and other non-HTTP/1.1 protocol support.

The basic I/O functionality of ConnStateData is moved to Server
class and a set of virtual methods designed to allow for child
class implementation of data processing operations.

No logic is changed in this patch, just symbol renaming and
moving of method logics as-is into libservers.la

src/client_side.cc
src/client_side.h
src/servers/FtpServer.cc
src/servers/Http1Server.cc
src/servers/Makefile.am
src/servers/Server.cc [new file with mode: 0644]
src/servers/Server.h [new file with mode: 0644]
src/stat.cc
src/tests/stub_client_side.cc
src/tunnel.cc

index 379768913ecec99a60a436d90be7fc86b27eb073..005876415c71f66d97d07bf3701750bd39dc2ef9 100644 (file)
@@ -220,27 +220,6 @@ ClientSocketContext::getConn() const
     return http->getConn();
 }
 
-/**
- * This routine should be called to grow the in.buf and then
- * call Comm::Read().
- */
-void
-ConnStateData::readSomeData()
-{
-    if (reading())
-        return;
-
-    debugs(33, 4, HERE << clientConnection << ": reading request...");
-
-    // we can only read if there is more than 1 byte of space free
-    if (Config.maxRequestBufferSize - in.buf.length() < 2)
-        return;
-
-    typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
-    reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest);
-    Comm::Read(clientConnection, reader);
-}
-
 void
 ClientSocketContext::removeFromConnectionList(ConnStateData * conn)
 {
@@ -816,15 +795,13 @@ ConnStateData::swanSong()
 
     unpinConnection(true);
 
-    if (Comm::IsConnOpen(clientConnection))
-        clientConnection->close();
+    Server::swanSong(); // closes the client connection
 
 #if USE_AUTH
     // NP: do this bit after closing the connections to avoid side effects from unwanted TCP RST
     setAuth(NULL, "ConnStateData::SwanSong cleanup");
 #endif
 
-    BodyProducer::swanSong();
     flags.swanSang = true;
 }
 
@@ -849,6 +826,8 @@ ConnStateData::~ConnStateData()
     if (bodyPipe != NULL)
         stopProducingFor(bodyPipe, false);
 
+    delete bodyParser; // TODO: pool
+
 #if USE_OPENSSL
     delete sslServerBump;
 #endif
@@ -1815,7 +1794,7 @@ ConnStateData::stopSending(const char *error)
     if (!stoppedReceiving()) {
         if (const int64_t expecting = mayNeedToReadMoreBody()) {
             debugs(33, 5, HERE << "must still read " << expecting <<
-                   " request body bytes with " << in.buf.length() << " unused");
+                   " request body bytes with " << inBuf.length() << " unused");
             return; // wait for the request receiver to finish reading
         }
     }
@@ -1875,7 +1854,7 @@ ClientSocketContext *
 ConnStateData::abortRequestParsing(const char *const uri)
 {
     ClientHttpRequest *http = new ClientHttpRequest(this);
-    http->req_sz = in.buf.length();
+    http->req_sz = inBuf.length();
     http->uri = xstrdup(uri);
     setLogUri (http, uri);
     ClientSocketContext *context = new ClientSocketContext(clientConnection, http);
@@ -2161,12 +2140,12 @@ parseHttpRequest(ConnStateData *csd, const Http1::RequestParserPointer &hp)
 {
     /* Attempt to parse the first line; this will define where the method, url, version and header begin */
     {
-        const bool parsedOk = hp->parse(csd->in.buf);
+        const bool parsedOk = hp->parse(csd->inBuf);
 
         if (csd->port->flags.isIntercepted() && Config.accessList.on_unsupported_protocol)
-            csd->preservedClientData = csd->in.buf;
+            csd->preservedClientData = csd->inBuf;
         // sync the buffers after parsing.
-        csd->in.buf = hp->remaining();
+        csd->inBuf = hp->remaining();
 
         if (hp->needsMoreData()) {
             debugs(33, 5, "Incomplete request, waiting for end of request line");
@@ -2281,28 +2260,6 @@ parseHttpRequest(ConnStateData *csd, const Http1::RequestParserPointer &hp)
     return result;
 }
 
-bool
-ConnStateData::In::maybeMakeSpaceAvailable()
-{
-    if (buf.spaceSize() < 2) {
-        const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize();
-        if (haveCapacity >= Config.maxRequestBufferSize) {
-            debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize);
-            return false;
-        }
-        if (haveCapacity == 0) {
-            // haveCapacity is based on the SBuf visible window of the MemBlob buffer, which may fill up.
-            // at which point bump the buffer back to default. This allocates a new MemBlob with any un-parsed bytes.
-            buf.reserveCapacity(CLIENT_REQ_BUF_SZ);
-        } else {
-            const SBuf::size_type wantCapacity = min(static_cast<SBuf::size_type>(Config.maxRequestBufferSize), haveCapacity*2);
-            buf.reserveCapacity(wantCapacity);
-        }
-        debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() << " used=" << buf.length());
-    }
-    return (buf.spaceSize() >= 2);
-}
-
 void
 ConnStateData::addContextToQueue(ClientSocketContext * context)
 {
@@ -2326,31 +2283,31 @@ ConnStateData::getConcurrentRequestCount() const
     return result;
 }
 
-int
+bool
 ConnStateData::connFinishedWithConn(int size)
 {
     if (size == 0) {
-        if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) {
+        if (getConcurrentRequestCount() == 0 && inBuf.isEmpty()) {
             /* no current or pending requests */
             debugs(33, 4, HERE << clientConnection << " closed");
-            return 1;
+            return true;
         } else if (!Config.onoff.half_closed_clients) {
             /* admin doesn't want to support half-closed client sockets */
             debugs(33, 3, HERE << clientConnection << " aborted (half_closed_clients disabled)");
             notifyAllContexts(0); // no specific error implies abort
-            return 1;
+            return true;
         }
     }
 
-    return 0;
+    return false;
 }
 
 void
 ConnStateData::consumeInput(const size_t byteCount)
 {
-    assert(byteCount > 0 && byteCount <= in.buf.length());
-    in.buf.consume(byteCount);
-    debugs(33, 5, "in.buf has " << in.buf.length() << " unused bytes");
+    assert(byteCount > 0 && byteCount <= inBuf.length());
+    inBuf.consume(byteCount);
+    debugs(33, 5, "inBuf has " << inBuf.length() << " unused bytes");
 }
 
 void
@@ -2809,15 +2766,15 @@ ConnStateData::parseProxyProtocolHeader()
     // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
 
     // detect and parse PROXY/2.0 protocol header
-    if (in.buf.startsWith(Proxy2p0magic))
+    if (inBuf.startsWith(Proxy2p0magic))
         return parseProxy2p0();
 
     // detect and parse PROXY/1.0 protocol header
-    if (in.buf.startsWith(Proxy1p0magic))
+    if (inBuf.startsWith(Proxy1p0magic))
         return parseProxy1p0();
 
     // detect and terminate other protocols
-    if (in.buf.length() >= Proxy2p0magic.length()) {
+    if (inBuf.length() >= Proxy2p0magic.length()) {
         // PROXY/1.0 magic is shorter, so we know that
         // the input does not start with any PROXY magic
         return proxyProtocolError("PROXY protocol error: invalid header");
@@ -2834,7 +2791,7 @@ ConnStateData::parseProxyProtocolHeader()
 bool
 ConnStateData::parseProxy1p0()
 {
-    ::Parser::Tokenizer tok(in.buf);
+    ::Parser::Tokenizer tok(inBuf);
     tok.skip(Proxy1p0magic);
 
     // skip to first LF (assumes it is part of CRLF)
@@ -2843,7 +2800,7 @@ ConnStateData::parseProxy1p0()
     if (tok.prefix(line, lineContent, 107-Proxy1p0magic.length())) {
         if (tok.skip('\n')) {
             // found valid header
-            in.buf = tok.remaining();
+            inBuf = tok.remaining();
             needProxyProtocolHeader_ = false;
             // reset the tokenizer to work on found line only.
             tok.reset(line);
@@ -2851,7 +2808,7 @@ ConnStateData::parseProxy1p0()
             return false; // no LF yet
 
     } else // protocol error only if there are more than 107 bytes prefix header
-        return proxyProtocolError(in.buf.length() > 107? "PROXY/1.0 error: missing CRLF" : NULL);
+        return proxyProtocolError(inBuf.length() > 107? "PROXY/1.0 error: missing CRLF" : NULL);
 
     static const SBuf unknown("UNKNOWN"), tcpName("TCP");
     if (tok.skip(tcpName)) {
@@ -2928,34 +2885,34 @@ bool
 ConnStateData::parseProxy2p0()
 {
     static const SBuf::size_type prefixLen = Proxy2p0magic.length();
-    if (in.buf.length() < prefixLen + 4)
+    if (inBuf.length() < prefixLen + 4)
         return false; // need more bytes
 
-    if ((in.buf[prefixLen] & 0xF0) != 0x20) // version == 2 is mandatory
+    if ((inBuf[prefixLen] & 0xF0) != 0x20) // version == 2 is mandatory
         return proxyProtocolError("PROXY/2.0 error: invalid version");
 
-    const char command = (in.buf[prefixLen] & 0x0F);
+    const char command = (inBuf[prefixLen] & 0x0F);
     if ((command & 0xFE) != 0x00) // values other than 0x0-0x1 are invalid
         return proxyProtocolError("PROXY/2.0 error: invalid command");
 
-    const char family = (in.buf[prefixLen+1] & 0xF0) >>4;
+    const char family = (inBuf[prefixLen+1] & 0xF0) >>4;
     if (family > 0x3) // values other than 0x0-0x3 are invalid
         return proxyProtocolError("PROXY/2.0 error: invalid family");
 
-    const char proto = (in.buf[prefixLen+1] & 0x0F);
+    const char proto = (inBuf[prefixLen+1] & 0x0F);
     if (proto > 0x2) // values other than 0x0-0x2 are invalid
         return proxyProtocolError("PROXY/2.0 error: invalid protocol type");
 
-    const char *clen = in.buf.rawContent() + prefixLen + 2;
+    const char *clen = inBuf.rawContent() + prefixLen + 2;
     uint16_t len;
     memcpy(&len, clen, sizeof(len));
     len = ntohs(len);
 
-    if (in.buf.length() < prefixLen + 4 + len)
+    if (inBuf.length() < prefixLen + 4 + len)
         return false; // need more bytes
 
-    in.buf.consume(prefixLen + 4); // 4 being the extra bytes
-    const SBuf extra = in.buf.consume(len);
+    inBuf.consume(prefixLen + 4); // 4 being the extra bytes
+    const SBuf extra = inBuf.consume(len);
     needProxyProtocolHeader_ = false; // found successfully
 
     // LOCAL connections do nothing with the extras
@@ -3045,10 +3002,10 @@ ConnStateData::clientParseRequests()
 
     // Loop while we have read bytes that are not needed for producing the body
     // On errors, bodyPipe may become nil, but readMore will be cleared
-    while (!in.buf.isEmpty() && !bodyPipe && flags.readMore) {
+    while (!inBuf.isEmpty() && !bodyPipe && flags.readMore) {
 
         /* Don't try to parse if the buffer is empty */
-        if (in.buf.isEmpty())
+        if (inBuf.isEmpty())
             break;
 
         /* Limit the number of concurrent requests */
@@ -3078,8 +3035,8 @@ ConnStateData::clientParseRequests()
             }
         } else {
             debugs(33, 5, clientConnection << ": not enough request data: " <<
-                   in.buf.length() << " < " << Config.maxRequestHeaderSize);
-            Must(in.buf.length() < Config.maxRequestHeaderSize);
+                   inBuf.length() << " < " << Config.maxRequestHeaderSize);
+            Must(inBuf.length() < Config.maxRequestHeaderSize);
             break;
         }
     }
@@ -3089,81 +3046,11 @@ ConnStateData::clientParseRequests()
 }
 
 void
-ConnStateData::clientReadRequest(const CommIoCbParams &io)
+ConnStateData::afterClientRead()
 {
-    debugs(33,5, io.conn);
-    Must(reading());
-    reader = NULL;
-
-    /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
-    if (io.flag == Comm::ERR_CLOSING) {
-        debugs(33,5, io.conn << " closing Bailout.");
-        return;
-    }
-
-    assert(Comm::IsConnOpen(clientConnection));
-    assert(io.conn->fd == clientConnection->fd);
-
-    /*
-     * Don't reset the timeout value here. The value should be
-     * counting Config.Timeout.request and applies to the request
-     * as a whole, not individual read() calls.
-     * Plus, it breaks our lame *HalfClosed() detection
-     */
-
-    in.maybeMakeSpaceAvailable();
-    CommIoCbParams rd(this); // will be expanded with ReadNow results
-    rd.conn = io.conn;
-    switch (Comm::ReadNow(rd, in.buf)) {
-    case Comm::INPROGRESS:
-        if (in.buf.isEmpty())
-            debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
-        readSomeData();
-        return;
-
-    case Comm::OK:
-        statCounter.client_http.kbytes_in += rd.size;
-        if (!receivedFirstByte_)
-            receivedFirstByte();
-        // may comm_close or setReplyToError
-        if (!handleReadData())
-            return;
-
-        /* Continue to process previously read data */
-        break;
-
-    case Comm::ENDFILE: // close detected by 0-byte read
-        debugs(33, 5, io.conn << " closed?");
-
-        if (connFinishedWithConn(rd.size)) {
-            clientConnection->close();
-            return;
-        }
-
-        /* It might be half-closed, we can't tell */
-        fd_table[io.conn->fd].flags.socket_eof = true;
-        commMarkHalfClosed(io.conn->fd);
-        fd_note(io.conn->fd, "half-closed");
-
-        /* There is one more close check at the end, to detect aborted
-         * (partial) requests. At this point we can't tell if the request
-         * is partial.
-         */
-
-        /* Continue to process previously read data */
-        break;
-
-    // case Comm::COMM_ERROR:
-    default: // no other flags should ever occur
-        debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno));
-        notifyAllContexts(rd.xerrno);
-        io.conn->close();
-        return;
-    }
-
     /* Process next request */
     if (getConcurrentRequestCount() == 0)
-        fd_note(io.fd, "Reading next request");
+        fd_note(clientConnection->fd, "Reading next request");
 
     if (!clientParseRequests()) {
         if (!isOpen())
@@ -3176,8 +3063,8 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io)
          * be if we have an incomplete request.
          * XXX: This duplicates ClientSocketContext::keepaliveNextRequest
          */
-        if (getConcurrentRequestCount() == 0 && commIsHalfClosed(io.fd)) {
-            debugs(33, 5, HERE << io.conn << ": half-closed connection, no completed request parsed, connection closing.");
+        if (getConcurrentRequestCount() == 0 && commIsHalfClosed(clientConnection->fd)) {
+            debugs(33, 5, clientConnection << ": half-closed connection, no completed request parsed, connection closing.");
             clientConnection->close();
             return;
         }
@@ -3205,7 +3092,7 @@ ConnStateData::handleReadData()
 }
 
 /**
- * called when new request body data has been buffered in in.buf
+ * called when new request body data has been buffered in inBuf
  * may close the connection if we were closing and piped everything out
  *
  * \retval false called comm_close or setReplyToError (the caller should bail)
@@ -3216,14 +3103,14 @@ ConnStateData::handleRequestBodyData()
 {
     assert(bodyPipe != NULL);
 
-    if (in.bodyParser) { // chunked encoding
+    if (bodyParser) { // chunked encoding
         if (const err_type error = handleChunkedRequestBody()) {
             abortChunkedRequestBody(error);
             return false;
         }
     } else { // identity encoding
         debugs(33,5, HERE << "handling plain request body for " << clientConnection);
-        const size_t putSize = bodyPipe->putMoreData(in.buf.c_str(), in.buf.length());
+        const size_t putSize = bodyPipe->putMoreData(inBuf.c_str(), inBuf.length());
         if (putSize > 0)
             consumeInput(putSize);
 
@@ -3253,17 +3140,17 @@ ConnStateData::handleRequestBodyData()
 err_type
 ConnStateData::handleChunkedRequestBody()
 {
-    debugs(33, 7, "chunked from " << clientConnection << ": " << in.buf.length());
+    debugs(33, 7, "chunked from " << clientConnection << ": " << inBuf.length());
 
     try { // the parser will throw on errors
 
-        if (in.buf.isEmpty()) // nothing to do
+        if (inBuf.isEmpty()) // nothing to do
             return ERR_NONE;
 
         BodyPipeCheckout bpc(*bodyPipe);
-        in.bodyParser->setPayloadBuffer(&bpc.buf);
-        const bool parsed = in.bodyParser->parse(in.buf);
-        in.buf = in.bodyParser->remaining(); // sync buffers
+        bodyParser->setPayloadBuffer(&bpc.buf);
+        const bool parsed = bodyParser->parse(inBuf);
+        inBuf = bodyParser->remaining(); // sync buffers
         bpc.checkIn();
 
         // dechunk then check: the size limit applies to _dechunked_ content
@@ -3277,10 +3164,10 @@ ConnStateData::handleChunkedRequestBody()
         }
 
         // if chunk parser needs data, then the body pipe must need it too
-        Must(!in.bodyParser->needsMoreData() || bodyPipe->mayNeedMoreData());
+        Must(!bodyParser->needsMoreData() || bodyPipe->mayNeedMoreData());
 
         // if parser needs more space and we can consume nothing, we will stall
-        Must(!in.bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent());
+        Must(!bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent());
     } catch (...) { // TODO: be more specific
         debugs(33, 3, HERE << "malformed chunks" << bodyPipe->status());
         return ERR_INVALID_REQ;
@@ -3312,7 +3199,7 @@ ConnStateData::abortChunkedRequestBody(const err_type error)
                                     repContext->http->uri,
                                     CachePeer,
                                     repContext->http->request,
-                                    in.buf, NULL);
+                                    inBuf, NULL);
         context->pullData();
     } else {
         // close or otherwise we may get stuck as nobody will notice the error?
@@ -3385,6 +3272,8 @@ clientLifetimeTimeout(const CommTimeoutCbParams &io)
 
 ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) :
     AsyncJob("ConnStateData"), // kids overwrite
+    Server(xact),
+    bodyParser(nullptr),
     nrequests(0),
 #if USE_OPENSSL
     sslBumpMode(Ssl::bumpEnd),
@@ -3396,8 +3285,7 @@ ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) :
     signAlgorithm(Ssl::algSignTrusted),
 #endif
     stoppedSending_(NULL),
-    stoppedReceiving_(NULL),
-    receivedFirstByte_(false)
+    stoppedReceiving_(NULL)
 {
     flags.readMore = true; // kids may overwrite
     flags.swanSang = false;
@@ -3410,9 +3298,6 @@ ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) :
     pinning.peer = NULL;
 
     // store the details required for creating more MasterXaction objects as new requests come in
-    clientConnection = xact->tcpClient;
-    port = xact->squidPort;
-    transferProtocol = port->transport; // default to the *_port protocol= setting. may change later.
     log_addr = xact->tcpClient->remote;
     log_addr.applyMask(Config.Addrs.client_netmask);
 
@@ -3739,7 +3624,7 @@ httpsSslBumpAccessCheckDone(allow_t answer, void *data)
         debugs(33, 2, HERE << "sslBump not needed for " << connState->clientConnection);
         connState->sslBumpMode = Ssl::bumpNone;
     }
-    connState->fakeAConnectRequest("ssl-bump", connState->in.buf);
+    connState->fakeAConnectRequest("ssl-bump", connState->inBuf);
 }
 
 /** handle a new HTTPS connection */
@@ -4244,8 +4129,8 @@ ConnStateData::splice()
 
         // reset the current protocol to HTTP/1.1 (was "HTTPS" for the bumping process)
         transferProtocol = Http::ProtocolVersion();
-        // in.buf still has the "CONNECT ..." request data, reset it to SSL hello message
-        in.buf.append(rbuf.content(), rbuf.contentSize());
+        // inBuf still has the "CONNECT ..." request data, reset it to SSL hello message
+        inBuf.append(rbuf.content(), rbuf.contentSize());
         ClientSocketContext::Pointer context = getCurrentContext();
         ClientHttpRequest *http = context->http;
         tunnelStart(http);
@@ -4335,7 +4220,7 @@ ConnStateData::fakeAConnectRequest(const char *reason, const SBuf &payload)
     retStr.append(connectHost);
     retStr.append("\r\n\r\n");
     retStr.append(payload);
-    in.buf = retStr;
+    inBuf = retStr;
     bool ret = handleReadData();
     if (ret)
         ret = clientParseRequests();
@@ -4655,21 +4540,6 @@ ConnStateData::transparent() const
     return clientConnection != NULL && (clientConnection->flags & (COMM_TRANSPARENT|COMM_INTERCEPTION));
 }
 
-bool
-ConnStateData::reading() const
-{
-    return reader != NULL;
-}
-
-void
-ConnStateData::stopReading()
-{
-    if (reading()) {
-        Comm::ReadCancel(clientConnection->fd, reader);
-        reader = NULL;
-    }
-}
-
 BodyPipe::Pointer
 ConnStateData::expectRequestBody(int64_t size)
 {
@@ -4691,7 +4561,7 @@ ConnStateData::mayNeedToReadMoreBody() const
         return -1; // probably need to read more, but we cannot be sure
 
     const int64_t needToProduce = bodyPipe->unproducedSize();
-    const int64_t haveAvailable = static_cast<int64_t>(in.buf.length());
+    const int64_t haveAvailable = static_cast<int64_t>(inBuf.length());
 
     if (needToProduce <= haveAvailable)
         return 0; // we have read what we need (but are waiting for pipe space)
@@ -4734,8 +4604,8 @@ ConnStateData::startDechunkingRequest()
 {
     Must(bodyPipe != NULL);
     debugs(33, 5, HERE << "start dechunking" << bodyPipe->status());
-    assert(!in.bodyParser);
-    in.bodyParser = new Http1::TeChunkedParser;
+    assert(!bodyParser);
+    bodyParser = new Http1::TeChunkedParser;
 }
 
 /// put parsed content into input buffer and clean up
@@ -4757,18 +4627,8 @@ ConnStateData::finishDechunkingRequest(bool withSuccess)
         }
     }
 
-    delete in.bodyParser;
-    in.bodyParser = NULL;
-}
-
-ConnStateData::In::In() :
-    bodyParser(NULL),
-    buf()
-{}
-
-ConnStateData::In::~In()
-{
-    delete bodyParser; // TODO: pool
+    delete bodyParser;
+    bodyParser = NULL;
 }
 
 void
index bd58c3359eaa70ca896c8c36b13954483548a194..743f827550f06244992d751d0adf764effc7f926 100644 (file)
@@ -19,6 +19,7 @@
 #include "HttpControlMsg.h"
 #include "ipc/FdNotes.h"
 #include "SBuf.h"
+#include "servers/Server.h"
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
@@ -168,17 +169,21 @@ class ServerBump;
  *
  * If the above can be confirmed accurate we can call this object PipelineManager or similar
  */
-class ConnStateData : public BodyProducer, public HttpControlMsgSink, public RegisteredRunner
+class ConnStateData : public Server, public HttpControlMsgSink, public RegisteredRunner
 {
 
 public:
     explicit ConnStateData(const MasterXaction::Pointer &xact);
     virtual ~ConnStateData();
 
-    void readSomeData();
+    /* ::Server API */
+    virtual void notifyAllContexts(const int xerrno);
+    virtual void receivedFirstByte();
+    virtual bool handleReadData();
+    virtual void afterClientRead();
+
     bool areAllContextsForThisConnection() const;
     void freeAllContexts();
-    void notifyAllContexts(const int xerrno); ///< tell everybody about the err
     /// Traffic parsing
     bool clientParseRequests();
     void readNextRequest();
@@ -187,30 +192,10 @@ public:
     int getConcurrentRequestCount() const;
     bool isOpen() const;
 
-    /// Update flags and timeout after the first byte received
-    void receivedFirstByte();
-
     // HttpControlMsgSink API
     virtual void sendControlMsg(HttpControlMsg msg);
 
-    // Client TCP connection details from comm layer.
-    Comm::ConnectionPointer clientConnection;
-
-    /**
-     * The transfer protocol currently being spoken on this connection.
-     * HTTP/1 CONNECT and HTTP/2 SETTINGS offers the ability to change
-     * protocols on the fly.
-     */
-    AnyP::ProtocolVersion transferProtocol;
-
-    struct In {
-        In();
-        ~In();
-        bool maybeMakeSpaceAvailable();
-
-        Http1::TeChunkedParser *bodyParser; ///< parses chunked request body
-        SBuf buf;
-    } in;
+    Http1::TeChunkedParser *bodyParser; ///< parses HTTP/1.1 chunked request body
 
     /** number of body bytes we need to comm_read for the "current" request
      *
@@ -264,12 +249,7 @@ public:
         AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/
     } pinning;
 
-    /// Squid listening port details where this connection arrived.
-    AnyP::PortCfgPointer port;
-
     bool transparent() const;
-    bool reading() const;
-    void stopReading(); ///< cancels comm_read if it is scheduled
 
     /// true if we stopped receiving the request
     const char *stoppedReceiving() const { return stoppedReceiving_; }
@@ -287,7 +267,6 @@ public:
     virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer) = 0;
     virtual void noteBodyConsumerAborted(BodyPipe::Pointer) = 0;
 
-    bool handleReadData();
     bool handleRequestBodyData();
 
     /// Forward future client requests using the given server connection.
@@ -318,7 +297,6 @@ public:
     virtual void clientPinnedConnectionClosed(const CommCloseCbParams &io);
 
     // comm callbacks
-    void clientReadRequest(const CommIoCbParams &io);
     void clientReadFtpData(const CommIoCbParams &io);
     void connStateClosed(const CommCloseCbParams &io);
     void requestTimeout(const CommTimeoutCbParams &params);
@@ -454,7 +432,9 @@ protected:
     BodyPipe::Pointer bodyPipe; ///< set when we are reading request body
 
 private:
-    int connFinishedWithConn(int size);
+    /* ::Server API */
+    virtual bool connFinishedWithConn(int size);
+
     void clientAfterReadingRequests();
     bool concurrentRequestQueueFilled() const;
 
@@ -495,9 +475,6 @@ private:
     /// the reason why we no longer read the request or nil
     const char *stoppedReceiving_;
 
-    AsyncCall::Pointer reader; ///< set when we are reading
-
-    bool receivedFirstByte_; ///< true if at least one byte received on this connection
     SBuf connectionTag_; ///< clt_conn_tag=Tag annotation for client connection
 };
 
index 9c60b337750f88bd6570c0224aa02e654c13a5f2..8e657eda32796057259867e1a23081f92c3da760 100644 (file)
@@ -640,7 +640,7 @@ Ftp::Server::parseOneRequest()
     SBuf cmd;
     SBuf params;
 
-    Parser::Tokenizer tok(in.buf);
+    Parser::Tokenizer tok(inBuf);
 
     (void)tok.skipAll(LeadingSpace); // leading OWS and empty commands
     const bool parsed = tok.prefix(cmd, CommandChars); // required command
@@ -667,14 +667,14 @@ Ftp::Server::parseOneRequest()
     // technically, we may skip multiple NLs below, but that is OK
     if (!parsed || !tok.skipAll(CharacterSet::LF)) { // did not find terminating LF yet
         // we need more data, but can we buffer more?
-        if (in.buf.length() >= Config.maxRequestHeaderSize) {
+        if (inBuf.length() >= Config.maxRequestHeaderSize) {
             changeState(fssError, "huge req");
             quitAfterError(NULL);
             return earlyError(EarlyErrorKind::HugeRequest);
         } else {
             flags.readMore = true;
             debugs(33, 5, "Waiting for more, up to " <<
-                   (Config.maxRequestHeaderSize - in.buf.length()));
+                   (Config.maxRequestHeaderSize - inBuf.length()));
             return NULL;
         }
     }
@@ -1272,7 +1272,7 @@ Ftp::Server::wroteReply(const CommIoCbParams &io)
     case STREAM_COMPLETE:
         flags.readMore = true;
         changeState(fssConnected, "Ftp::Server::wroteReply");
-        if (in.bodyParser)
+        if (bodyParser)
             finishDechunkingRequest(false);
         context->keepaliveNextRequest();
         return;
index b3fea716e6793476e14989ea9fbad36b8e647dde..d01ba6e7ce2db321333097dfc7b9891bf54f1251 100644 (file)
@@ -121,7 +121,7 @@ Http::One::Server::buildHttpRequest(ClientSocketContext *context)
         }
         // setLogUri should called before repContext->setReplyToError
         setLogUri(http, http->uri, true);
-        const char * requestErrorBytes = in.buf.c_str();
+        const char * requestErrorBytes = inBuf.c_str();
         if (!clientTunnelOnError(this, context, request.getRaw(), parser_->method(), errPage, parser_->parseStatusCode, requestErrorBytes)) {
             // HttpRequest object not build yet, there is no reason to call
             // clientProcessRequestFinished method
@@ -135,7 +135,7 @@ Http::One::Server::buildHttpRequest(ClientSocketContext *context)
         // setLogUri should called before repContext->setReplyToError
         setLogUri(http, http->uri, true);
 
-        const char * requestErrorBytes = in.buf.c_str();
+        const char * requestErrorBytes = inBuf.c_str();
         if (!clientTunnelOnError(this, context, request.getRaw(), parser_->method(), ERR_INVALID_URL, Http::scBadRequest, requestErrorBytes)) {
             // HttpRequest object not build yet, there is no reason to call
             // clientProcessRequestFinished method
index 179ca9c8a1bc77ec8e311fd393d436357959271a..ca7c5ea311c46299668ad9efefe128d1a9f12bc8 100644 (file)
@@ -11,8 +11,10 @@ include $(top_srcdir)/src/TestHeaders.am
 noinst_LTLIBRARIES = libservers.la
 
 libservers_la_SOURCES = \
+       forward.h \
        FtpServer.cc \
        FtpServer.h \
        Http1Server.cc \
        Http1Server.h \
-       forward.h
+       Server.cc \
+       Server.h
diff --git a/src/servers/Server.cc b/src/servers/Server.cc
new file mode 100644 (file)
index 0000000..f4dec80
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "anyp/PortCfg.h"
+#include "comm.h"
+#include "comm/Read.h"
+#include "Debug.h"
+#include "fd.h"
+#include "fde.h"
+#include "MasterXaction.h"
+#include "servers/Server.h"
+#include "SquidConfig.h"
+#include "StatCounters.h"
+#include "tools.h"
+
+Server::Server(const MasterXaction::Pointer &xact) :
+    AsyncJob("::Server"), // kids overwrite
+    clientConnection(xact->tcpClient),
+    transferProtocol(xact->squidPort->transport),
+    port(xact->squidPort),
+    receivedFirstByte_(false)
+{}
+
+bool
+Server::doneAll() const
+{
+    // servers are not done while the connection is open
+    return !Comm::IsConnOpen(clientConnection) &&
+           BodyProducer::doneAll();
+}
+
+void
+Server::start()
+{
+    // TODO: shuffle activity from ConnStateData
+}
+
+void
+Server::swanSong()
+{
+    if (Comm::IsConnOpen(clientConnection))
+        clientConnection->close();
+
+    BodyProducer::swanSong();
+}
+
+void
+Server::stopReading()
+{
+    if (reading()) {
+        Comm::ReadCancel(clientConnection->fd, reader);
+        reader = NULL;
+    }
+}
+
+bool
+Server::maybeMakeSpaceAvailable()
+{
+    if (inBuf.spaceSize() < 2) {
+        const SBuf::size_type haveCapacity = inBuf.length() + inBuf.spaceSize();
+        if (haveCapacity >= Config.maxRequestBufferSize) {
+            debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize);
+            return false;
+        }
+        if (haveCapacity == 0) {
+            // haveCapacity is based on the SBuf visible window of the MemBlob buffer, which may fill up.
+            // at which point bump the buffer back to default. This allocates a new MemBlob with any un-parsed bytes.
+            inBuf.reserveCapacity(CLIENT_REQ_BUF_SZ);
+        } else {
+            const SBuf::size_type wantCapacity = min(static_cast<SBuf::size_type>(Config.maxRequestBufferSize), haveCapacity*2);
+            inBuf.reserveCapacity(wantCapacity);
+        }
+        debugs(33, 2, "growing request buffer: available=" << inBuf.spaceSize() << " used=" << inBuf.length());
+    }
+    return (inBuf.spaceSize() >= 2);
+}
+
+void
+Server::readSomeData()
+{
+    if (reading())
+        return;
+
+    debugs(33, 4, clientConnection << ": reading request...");
+
+    // we can only read if there is more than 1 byte of space free
+    if (Config.maxRequestBufferSize - inBuf.length() < 2)
+        return;
+
+    typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
+    reader = JobCallback(33, 5, Dialer, this, Server::doClientRead);
+    Comm::Read(clientConnection, reader);
+}
+
+void
+Server::doClientRead(const CommIoCbParams &io)
+{
+    debugs(33,5, io.conn);
+    Must(reading());
+    reader = NULL;
+
+    /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
+    if (io.flag == Comm::ERR_CLOSING) {
+        debugs(33,5, io.conn << " closing Bailout.");
+        return;
+    }
+
+    assert(Comm::IsConnOpen(clientConnection));
+    assert(io.conn->fd == clientConnection->fd);
+
+    /*
+     * Don't reset the timeout value here. The value should be
+     * counting Config.Timeout.request and applies to the request
+     * as a whole, not individual read() calls.
+     * Plus, it breaks our lame *HalfClosed() detection
+     */
+
+    maybeMakeSpaceAvailable();
+    CommIoCbParams rd(this); // will be expanded with ReadNow results
+    rd.conn = io.conn;
+    switch (Comm::ReadNow(rd, inBuf)) {
+    case Comm::INPROGRESS:
+
+        if (inBuf.isEmpty())
+            debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
+        readSomeData();
+        return;
+
+    case Comm::OK:
+        statCounter.client_http.kbytes_in += rd.size;
+        if (!receivedFirstByte_)
+            receivedFirstByte();
+        // may comm_close or setReplyToError
+        if (!handleReadData())
+            return;
+
+        /* Continue to process previously read data */
+        break;
+
+    case Comm::ENDFILE: // close detected by 0-byte read
+        debugs(33, 5, io.conn << " closed?");
+
+        if (connFinishedWithConn(rd.size)) {
+            clientConnection->close();
+            return;
+        }
+
+        /* It might be half-closed, we can't tell */
+        fd_table[io.conn->fd].flags.socket_eof = true;
+        commMarkHalfClosed(io.conn->fd);
+        fd_note(io.conn->fd, "half-closed");
+
+        /* There is one more close check at the end, to detect aborted
+         * (partial) requests. At this point we can't tell if the request
+         * is partial.
+         */
+
+        /* Continue to process previously read data */
+        break;
+
+    // case Comm::COMM_ERROR:
+    default: // no other flags should ever occur
+        debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno));
+        notifyAllContexts(rd.xerrno);
+        io.conn->close();
+        return;
+    }
+
+    afterClientRead();
+}
+
+void
+Server::clientWriteDone(const CommIoCbParams &io)
+{
+    debugs(33,5, io.conn);
+    Must(writer != NULL);
+    writer = NULL;
+
+    /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
+    if (io.flag == Comm::ERR_CLOSING) {
+        debugs(33,5, io.conn << " closing Bailout.");
+        return;
+    }
+
+    assert(Comm::IsConnOpen(clientConnection));
+    assert(io.conn->fd == clientConnection->fd);
+
+    writeSomeData(); // maybe schedules another write
+}
diff --git a/src/servers/Server.h b/src/servers/Server.h
new file mode 100644 (file)
index 0000000..25c23b5
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+/* DEBUG: section 33    Client-side Routines */
+
+#ifndef SQUID_SERVERS_SERVER_H
+#define SQUID_SERVERS_SERVER_H
+
+#include "anyp/forward.h"
+#include "anyp/ProtocolVersion.h"
+#include "base/AsyncJob.h"
+#include "BodyPipe.h"
+#include "comm/forward.h"
+#include "CommCalls.h"
+#include "SBuf.h"
+
+/**
+ * Common base for all Server classes used
+ * to manage connections from clients.
+ */
+class Server : virtual public AsyncJob, public BodyProducer
+{
+public:
+    Server(const MasterXaction::Pointer &xact);
+    virtual ~Server() {}
+
+    /* AsyncJob API */
+    virtual void start();
+    virtual bool doneAll() const;
+    virtual void swanSong();
+
+    /// tell all active contexts on a connection about an error
+    virtual void notifyAllContexts(const int xerrno) = 0;
+
+    /// ??
+    virtual bool connFinishedWithConn(int size) = 0;
+
+    /// processing to be done after a Comm::Read()
+    virtual void afterClientRead() = 0;
+
+    /// maybe grow the inBuf and schedule Comm::Read()
+    void readSomeData();
+
+    /**
+     * called when new request data has been read from the socket
+     *
+     * \retval false called comm_close or setReplyToError (the caller should bail)
+     * \retval true  we did not call comm_close or setReplyToError
+     */
+    virtual bool handleReadData() = 0;
+
+    /// whether Comm::Read() is scheduled
+    bool reading() const {return reader != NULL;}
+
+    /// cancels Comm::Read() if it is scheduled
+    void stopReading();
+
+    /// Update flags and timeout after the first byte received
+    virtual void receivedFirstByte() = 0;
+
+    /// maybe schedule another Comm::Write() and perform any
+    /// processing to be done after previous Comm::Write() completes
+    virtual void writeSomeData() {}
+
+    /// whether Comm::Write() is scheduled
+    bool writing() const {return writer != NULL;}
+
+// XXX: should be 'protected:' for child access only,
+//      but all sorts of code likes to play directly
+//      with the I/O buffers and socket.
+public:
+
+    /// grows the available read buffer space (if possible)
+    bool maybeMakeSpaceAvailable();
+
+    // Client TCP connection details from comm layer.
+    Comm::ConnectionPointer clientConnection;
+
+    /**
+     * The transfer protocol currently being spoken on this connection.
+     * HTTP/1.x CONNECT, HTTP/1.1 Upgrade and HTTP/2 SETTINGS offer the
+     * ability to change protocols on the fly.
+     */
+    AnyP::ProtocolVersion transferProtocol;
+
+    /// Squid listening port details where this connection arrived.
+    AnyP::PortCfgPointer port;
+
+    /// read I/O buffer for the client connection
+    SBuf inBuf;
+
+    bool receivedFirstByte_; ///< true if at least one byte received on this connection
+
+protected:
+    void doClientRead(const CommIoCbParams &io);
+    void clientWriteDone(const CommIoCbParams &io);
+
+    AsyncCall::Pointer reader; ///< set when we are reading
+    AsyncCall::Pointer writer; ///< set when we are writing
+};
+
+#endif /* SQUID_SERVERS_SERVER_H */
index 13d18c11058ec8d29f0386de7428edad6e240e62..50fe433c6ffa134bd697ef156e627ff0fe28c96b 100644 (file)
@@ -1857,7 +1857,7 @@ statClientRequests(StoreEntry * s)
                               fd_table[fd].bytes_read, fd_table[fd].bytes_written);
             storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc);
             storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n",
-                              conn->in.buf.c_str(), (long int) conn->in.buf.length(), (long int) conn->in.buf.spaceSize());
+                              conn->inBuf.rawContent(), (long int) conn->inBuf.length(), (long int) conn->inBuf.spaceSize());
             storeAppendPrintf(s, "\tremote: %s\n",
                               conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN));
             storeAppendPrintf(s, "\tlocal: %s\n",
index 259d45ae26fb27f9d291c2d9d45531ef8d294e88..513141b0b49f615e2210549afcf38f7006ee667e 100644 (file)
@@ -36,7 +36,6 @@ void ClientSocketContext::registerWithConn() STUB
 void ClientSocketContext::noteIoError(const int xerrno) STUB
 void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB
 
-void ConnStateData::readSomeData() STUB
 bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false)
 void ConnStateData::freeAllContexts() STUB
 void ConnStateData::notifyAllContexts(const int xerrno) STUB
@@ -51,8 +50,6 @@ int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0)
 void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB
 #endif
 bool ConnStateData::transparent() const STUB_RETVAL(false)
-bool ConnStateData::reading() const STUB_RETVAL(false)
-void ConnStateData::stopReading() STUB
 void ConnStateData::stopReceiving(const char *error) STUB
 void ConnStateData::stopSending(const char *error) STUB
 void ConnStateData::expectNoForwarding() STUB
@@ -64,7 +61,6 @@ void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn,
 void ConnStateData::unpinConnection(const bool andClose) STUB
 const Comm::ConnectionPointer ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *peer) STUB_RETVAL(NULL)
 void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) STUB
-void ConnStateData::clientReadRequest(const CommIoCbParams &io) STUB
 void ConnStateData::connStateClosed(const CommCloseCbParams &io) STUB
 void ConnStateData::requestTimeout(const CommTimeoutCbParams &params) STUB
 void ConnStateData::swanSong() STUB
@@ -80,8 +76,6 @@ void ConnStateData::buildSslCertGenerationParams(Ssl::CertificateProperties &cer
 bool ConnStateData::serveDelayedError(ClientSocketContext *context) STUB_RETVAL(false)
 #endif
 
-bool ConnStateData::In::maybeMakeSpaceAvailable() STUB_RETVAL(false)
-
 void setLogUri(ClientHttpRequest * http, char const *uri, bool cleanUrl) STUB
 const char *findTrailingHTTPVersion(const char *uriAndHTTPVersion, const char *end) STUB_RETVAL(NULL)
 int varyEvaluateMatch(StoreEntry * entry, HttpRequest * req) STUB_RETVAL(0)
index 02551b151ea121bee1f429a27752959e7e9f89ee..44d28c5bcf3506181b4d7160f1c855fbfae13cd5 100644 (file)
@@ -837,11 +837,11 @@ tunnelStartShoveling(TunnelStateData *tunnelState)
             tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone);
         }
 
-        if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->in.buf.isEmpty()) {
-            struct ConnStateData::In *in = &tunnelState->http->getConn()->in;
-            debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf << "\n----------");
-            tunnelState->preReadClientData.append(in->buf);
-            in->buf.consume(); // ConnStateData buffer accounting after the shuffle.
+        if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->inBuf.isEmpty()) {
+            SBuf * const in = &tunnelState->http->getConn()->inBuf;
+            debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << *in << "\n----------");
+            tunnelState->preReadClientData.append(*in);
+            in->consume(); // ConnStateData buffer accounting after the shuffle.
         }
         tunnelState->copyClientBytes();
     }