From: Amos Jeffries Date: Sat, 7 Nov 2015 12:08:33 +0000 (-0800) Subject: Split core Server operations from ConnStateData X-Git-Tag: SQUID_4_0_3~38 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=fcc444e3db9ac978f7ab74d77f56dfe2c499652b;p=thirdparty%2Fsquid.git Split core Server operations from ConnStateData This improves the servers/libserver.la class hierarchy in preparation for HTTP/2 and other non-HTTP/1.1 protocol support. The basic I/O functionality of ConnStateData is moved to Server class and a set of virtual methods designed to allow for child class implementation of data processing operations. No logic is changed in this patch, just symbol renaming and moving of method logics as-is into libservers.la --- diff --git a/src/client_side.cc b/src/client_side.cc index 379768913e..005876415c 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -220,27 +220,6 @@ ClientSocketContext::getConn() const return http->getConn(); } -/** - * This routine should be called to grow the in.buf and then - * call Comm::Read(). - */ -void -ConnStateData::readSomeData() -{ - if (reading()) - return; - - debugs(33, 4, HERE << clientConnection << ": reading request..."); - - // we can only read if there is more than 1 byte of space free - if (Config.maxRequestBufferSize - in.buf.length() < 2) - return; - - typedef CommCbMemFunT Dialer; - reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest); - Comm::Read(clientConnection, reader); -} - void ClientSocketContext::removeFromConnectionList(ConnStateData * conn) { @@ -816,15 +795,13 @@ ConnStateData::swanSong() unpinConnection(true); - if (Comm::IsConnOpen(clientConnection)) - clientConnection->close(); + Server::swanSong(); // closes the client connection #if USE_AUTH // NP: do this bit after closing the connections to avoid side effects from unwanted TCP RST setAuth(NULL, "ConnStateData::SwanSong cleanup"); #endif - BodyProducer::swanSong(); flags.swanSang = true; } @@ -849,6 +826,8 @@ ConnStateData::~ConnStateData() if (bodyPipe != NULL) stopProducingFor(bodyPipe, false); + delete bodyParser; // TODO: pool + #if USE_OPENSSL delete sslServerBump; #endif @@ -1815,7 +1794,7 @@ ConnStateData::stopSending(const char *error) if (!stoppedReceiving()) { if (const int64_t expecting = mayNeedToReadMoreBody()) { debugs(33, 5, HERE << "must still read " << expecting << - " request body bytes with " << in.buf.length() << " unused"); + " request body bytes with " << inBuf.length() << " unused"); return; // wait for the request receiver to finish reading } } @@ -1875,7 +1854,7 @@ ClientSocketContext * ConnStateData::abortRequestParsing(const char *const uri) { ClientHttpRequest *http = new ClientHttpRequest(this); - http->req_sz = in.buf.length(); + http->req_sz = inBuf.length(); http->uri = xstrdup(uri); setLogUri (http, uri); ClientSocketContext *context = new ClientSocketContext(clientConnection, http); @@ -2161,12 +2140,12 @@ parseHttpRequest(ConnStateData *csd, const Http1::RequestParserPointer &hp) { /* Attempt to parse the first line; this will define where the method, url, version and header begin */ { - const bool parsedOk = hp->parse(csd->in.buf); + const bool parsedOk = hp->parse(csd->inBuf); if (csd->port->flags.isIntercepted() && Config.accessList.on_unsupported_protocol) - csd->preservedClientData = csd->in.buf; + csd->preservedClientData = csd->inBuf; // sync the buffers after parsing. - csd->in.buf = hp->remaining(); + csd->inBuf = hp->remaining(); if (hp->needsMoreData()) { debugs(33, 5, "Incomplete request, waiting for end of request line"); @@ -2281,28 +2260,6 @@ parseHttpRequest(ConnStateData *csd, const Http1::RequestParserPointer &hp) return result; } -bool -ConnStateData::In::maybeMakeSpaceAvailable() -{ - if (buf.spaceSize() < 2) { - const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize(); - if (haveCapacity >= Config.maxRequestBufferSize) { - debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize); - return false; - } - if (haveCapacity == 0) { - // haveCapacity is based on the SBuf visible window of the MemBlob buffer, which may fill up. - // at which point bump the buffer back to default. This allocates a new MemBlob with any un-parsed bytes. - buf.reserveCapacity(CLIENT_REQ_BUF_SZ); - } else { - const SBuf::size_type wantCapacity = min(static_cast(Config.maxRequestBufferSize), haveCapacity*2); - buf.reserveCapacity(wantCapacity); - } - debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() << " used=" << buf.length()); - } - return (buf.spaceSize() >= 2); -} - void ConnStateData::addContextToQueue(ClientSocketContext * context) { @@ -2326,31 +2283,31 @@ ConnStateData::getConcurrentRequestCount() const return result; } -int +bool ConnStateData::connFinishedWithConn(int size) { if (size == 0) { - if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) { + if (getConcurrentRequestCount() == 0 && inBuf.isEmpty()) { /* no current or pending requests */ debugs(33, 4, HERE << clientConnection << " closed"); - return 1; + return true; } else if (!Config.onoff.half_closed_clients) { /* admin doesn't want to support half-closed client sockets */ debugs(33, 3, HERE << clientConnection << " aborted (half_closed_clients disabled)"); notifyAllContexts(0); // no specific error implies abort - return 1; + return true; } } - return 0; + return false; } void ConnStateData::consumeInput(const size_t byteCount) { - assert(byteCount > 0 && byteCount <= in.buf.length()); - in.buf.consume(byteCount); - debugs(33, 5, "in.buf has " << in.buf.length() << " unused bytes"); + assert(byteCount > 0 && byteCount <= inBuf.length()); + inBuf.consume(byteCount); + debugs(33, 5, "inBuf has " << inBuf.length() << " unused bytes"); } void @@ -2809,15 +2766,15 @@ ConnStateData::parseProxyProtocolHeader() // http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt // detect and parse PROXY/2.0 protocol header - if (in.buf.startsWith(Proxy2p0magic)) + if (inBuf.startsWith(Proxy2p0magic)) return parseProxy2p0(); // detect and parse PROXY/1.0 protocol header - if (in.buf.startsWith(Proxy1p0magic)) + if (inBuf.startsWith(Proxy1p0magic)) return parseProxy1p0(); // detect and terminate other protocols - if (in.buf.length() >= Proxy2p0magic.length()) { + if (inBuf.length() >= Proxy2p0magic.length()) { // PROXY/1.0 magic is shorter, so we know that // the input does not start with any PROXY magic return proxyProtocolError("PROXY protocol error: invalid header"); @@ -2834,7 +2791,7 @@ ConnStateData::parseProxyProtocolHeader() bool ConnStateData::parseProxy1p0() { - ::Parser::Tokenizer tok(in.buf); + ::Parser::Tokenizer tok(inBuf); tok.skip(Proxy1p0magic); // skip to first LF (assumes it is part of CRLF) @@ -2843,7 +2800,7 @@ ConnStateData::parseProxy1p0() if (tok.prefix(line, lineContent, 107-Proxy1p0magic.length())) { if (tok.skip('\n')) { // found valid header - in.buf = tok.remaining(); + inBuf = tok.remaining(); needProxyProtocolHeader_ = false; // reset the tokenizer to work on found line only. tok.reset(line); @@ -2851,7 +2808,7 @@ ConnStateData::parseProxy1p0() return false; // no LF yet } else // protocol error only if there are more than 107 bytes prefix header - return proxyProtocolError(in.buf.length() > 107? "PROXY/1.0 error: missing CRLF" : NULL); + return proxyProtocolError(inBuf.length() > 107? "PROXY/1.0 error: missing CRLF" : NULL); static const SBuf unknown("UNKNOWN"), tcpName("TCP"); if (tok.skip(tcpName)) { @@ -2928,34 +2885,34 @@ bool ConnStateData::parseProxy2p0() { static const SBuf::size_type prefixLen = Proxy2p0magic.length(); - if (in.buf.length() < prefixLen + 4) + if (inBuf.length() < prefixLen + 4) return false; // need more bytes - if ((in.buf[prefixLen] & 0xF0) != 0x20) // version == 2 is mandatory + if ((inBuf[prefixLen] & 0xF0) != 0x20) // version == 2 is mandatory return proxyProtocolError("PROXY/2.0 error: invalid version"); - const char command = (in.buf[prefixLen] & 0x0F); + const char command = (inBuf[prefixLen] & 0x0F); if ((command & 0xFE) != 0x00) // values other than 0x0-0x1 are invalid return proxyProtocolError("PROXY/2.0 error: invalid command"); - const char family = (in.buf[prefixLen+1] & 0xF0) >>4; + const char family = (inBuf[prefixLen+1] & 0xF0) >>4; if (family > 0x3) // values other than 0x0-0x3 are invalid return proxyProtocolError("PROXY/2.0 error: invalid family"); - const char proto = (in.buf[prefixLen+1] & 0x0F); + const char proto = (inBuf[prefixLen+1] & 0x0F); if (proto > 0x2) // values other than 0x0-0x2 are invalid return proxyProtocolError("PROXY/2.0 error: invalid protocol type"); - const char *clen = in.buf.rawContent() + prefixLen + 2; + const char *clen = inBuf.rawContent() + prefixLen + 2; uint16_t len; memcpy(&len, clen, sizeof(len)); len = ntohs(len); - if (in.buf.length() < prefixLen + 4 + len) + if (inBuf.length() < prefixLen + 4 + len) return false; // need more bytes - in.buf.consume(prefixLen + 4); // 4 being the extra bytes - const SBuf extra = in.buf.consume(len); + inBuf.consume(prefixLen + 4); // 4 being the extra bytes + const SBuf extra = inBuf.consume(len); needProxyProtocolHeader_ = false; // found successfully // LOCAL connections do nothing with the extras @@ -3045,10 +3002,10 @@ ConnStateData::clientParseRequests() // Loop while we have read bytes that are not needed for producing the body // On errors, bodyPipe may become nil, but readMore will be cleared - while (!in.buf.isEmpty() && !bodyPipe && flags.readMore) { + while (!inBuf.isEmpty() && !bodyPipe && flags.readMore) { /* Don't try to parse if the buffer is empty */ - if (in.buf.isEmpty()) + if (inBuf.isEmpty()) break; /* Limit the number of concurrent requests */ @@ -3078,8 +3035,8 @@ ConnStateData::clientParseRequests() } } else { debugs(33, 5, clientConnection << ": not enough request data: " << - in.buf.length() << " < " << Config.maxRequestHeaderSize); - Must(in.buf.length() < Config.maxRequestHeaderSize); + inBuf.length() << " < " << Config.maxRequestHeaderSize); + Must(inBuf.length() < Config.maxRequestHeaderSize); break; } } @@ -3089,81 +3046,11 @@ ConnStateData::clientParseRequests() } void -ConnStateData::clientReadRequest(const CommIoCbParams &io) +ConnStateData::afterClientRead() { - debugs(33,5, io.conn); - Must(reading()); - reader = NULL; - - /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */ - if (io.flag == Comm::ERR_CLOSING) { - debugs(33,5, io.conn << " closing Bailout."); - return; - } - - assert(Comm::IsConnOpen(clientConnection)); - assert(io.conn->fd == clientConnection->fd); - - /* - * Don't reset the timeout value here. The value should be - * counting Config.Timeout.request and applies to the request - * as a whole, not individual read() calls. - * Plus, it breaks our lame *HalfClosed() detection - */ - - in.maybeMakeSpaceAvailable(); - CommIoCbParams rd(this); // will be expanded with ReadNow results - rd.conn = io.conn; - switch (Comm::ReadNow(rd, in.buf)) { - case Comm::INPROGRESS: - if (in.buf.isEmpty()) - debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno)); - readSomeData(); - return; - - case Comm::OK: - statCounter.client_http.kbytes_in += rd.size; - if (!receivedFirstByte_) - receivedFirstByte(); - // may comm_close or setReplyToError - if (!handleReadData()) - return; - - /* Continue to process previously read data */ - break; - - case Comm::ENDFILE: // close detected by 0-byte read - debugs(33, 5, io.conn << " closed?"); - - if (connFinishedWithConn(rd.size)) { - clientConnection->close(); - return; - } - - /* It might be half-closed, we can't tell */ - fd_table[io.conn->fd].flags.socket_eof = true; - commMarkHalfClosed(io.conn->fd); - fd_note(io.conn->fd, "half-closed"); - - /* There is one more close check at the end, to detect aborted - * (partial) requests. At this point we can't tell if the request - * is partial. - */ - - /* Continue to process previously read data */ - break; - - // case Comm::COMM_ERROR: - default: // no other flags should ever occur - debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno)); - notifyAllContexts(rd.xerrno); - io.conn->close(); - return; - } - /* Process next request */ if (getConcurrentRequestCount() == 0) - fd_note(io.fd, "Reading next request"); + fd_note(clientConnection->fd, "Reading next request"); if (!clientParseRequests()) { if (!isOpen()) @@ -3176,8 +3063,8 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io) * be if we have an incomplete request. * XXX: This duplicates ClientSocketContext::keepaliveNextRequest */ - if (getConcurrentRequestCount() == 0 && commIsHalfClosed(io.fd)) { - debugs(33, 5, HERE << io.conn << ": half-closed connection, no completed request parsed, connection closing."); + if (getConcurrentRequestCount() == 0 && commIsHalfClosed(clientConnection->fd)) { + debugs(33, 5, clientConnection << ": half-closed connection, no completed request parsed, connection closing."); clientConnection->close(); return; } @@ -3205,7 +3092,7 @@ ConnStateData::handleReadData() } /** - * called when new request body data has been buffered in in.buf + * called when new request body data has been buffered in inBuf * may close the connection if we were closing and piped everything out * * \retval false called comm_close or setReplyToError (the caller should bail) @@ -3216,14 +3103,14 @@ ConnStateData::handleRequestBodyData() { assert(bodyPipe != NULL); - if (in.bodyParser) { // chunked encoding + if (bodyParser) { // chunked encoding if (const err_type error = handleChunkedRequestBody()) { abortChunkedRequestBody(error); return false; } } else { // identity encoding debugs(33,5, HERE << "handling plain request body for " << clientConnection); - const size_t putSize = bodyPipe->putMoreData(in.buf.c_str(), in.buf.length()); + const size_t putSize = bodyPipe->putMoreData(inBuf.c_str(), inBuf.length()); if (putSize > 0) consumeInput(putSize); @@ -3253,17 +3140,17 @@ ConnStateData::handleRequestBodyData() err_type ConnStateData::handleChunkedRequestBody() { - debugs(33, 7, "chunked from " << clientConnection << ": " << in.buf.length()); + debugs(33, 7, "chunked from " << clientConnection << ": " << inBuf.length()); try { // the parser will throw on errors - if (in.buf.isEmpty()) // nothing to do + if (inBuf.isEmpty()) // nothing to do return ERR_NONE; BodyPipeCheckout bpc(*bodyPipe); - in.bodyParser->setPayloadBuffer(&bpc.buf); - const bool parsed = in.bodyParser->parse(in.buf); - in.buf = in.bodyParser->remaining(); // sync buffers + bodyParser->setPayloadBuffer(&bpc.buf); + const bool parsed = bodyParser->parse(inBuf); + inBuf = bodyParser->remaining(); // sync buffers bpc.checkIn(); // dechunk then check: the size limit applies to _dechunked_ content @@ -3277,10 +3164,10 @@ ConnStateData::handleChunkedRequestBody() } // if chunk parser needs data, then the body pipe must need it too - Must(!in.bodyParser->needsMoreData() || bodyPipe->mayNeedMoreData()); + Must(!bodyParser->needsMoreData() || bodyPipe->mayNeedMoreData()); // if parser needs more space and we can consume nothing, we will stall - Must(!in.bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent()); + Must(!bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent()); } catch (...) { // TODO: be more specific debugs(33, 3, HERE << "malformed chunks" << bodyPipe->status()); return ERR_INVALID_REQ; @@ -3312,7 +3199,7 @@ ConnStateData::abortChunkedRequestBody(const err_type error) repContext->http->uri, CachePeer, repContext->http->request, - in.buf, NULL); + inBuf, NULL); context->pullData(); } else { // close or otherwise we may get stuck as nobody will notice the error? @@ -3385,6 +3272,8 @@ clientLifetimeTimeout(const CommTimeoutCbParams &io) ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) : AsyncJob("ConnStateData"), // kids overwrite + Server(xact), + bodyParser(nullptr), nrequests(0), #if USE_OPENSSL sslBumpMode(Ssl::bumpEnd), @@ -3396,8 +3285,7 @@ ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) : signAlgorithm(Ssl::algSignTrusted), #endif stoppedSending_(NULL), - stoppedReceiving_(NULL), - receivedFirstByte_(false) + stoppedReceiving_(NULL) { flags.readMore = true; // kids may overwrite flags.swanSang = false; @@ -3410,9 +3298,6 @@ ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) : pinning.peer = NULL; // store the details required for creating more MasterXaction objects as new requests come in - clientConnection = xact->tcpClient; - port = xact->squidPort; - transferProtocol = port->transport; // default to the *_port protocol= setting. may change later. log_addr = xact->tcpClient->remote; log_addr.applyMask(Config.Addrs.client_netmask); @@ -3739,7 +3624,7 @@ httpsSslBumpAccessCheckDone(allow_t answer, void *data) debugs(33, 2, HERE << "sslBump not needed for " << connState->clientConnection); connState->sslBumpMode = Ssl::bumpNone; } - connState->fakeAConnectRequest("ssl-bump", connState->in.buf); + connState->fakeAConnectRequest("ssl-bump", connState->inBuf); } /** handle a new HTTPS connection */ @@ -4244,8 +4129,8 @@ ConnStateData::splice() // reset the current protocol to HTTP/1.1 (was "HTTPS" for the bumping process) transferProtocol = Http::ProtocolVersion(); - // in.buf still has the "CONNECT ..." request data, reset it to SSL hello message - in.buf.append(rbuf.content(), rbuf.contentSize()); + // inBuf still has the "CONNECT ..." request data, reset it to SSL hello message + inBuf.append(rbuf.content(), rbuf.contentSize()); ClientSocketContext::Pointer context = getCurrentContext(); ClientHttpRequest *http = context->http; tunnelStart(http); @@ -4335,7 +4220,7 @@ ConnStateData::fakeAConnectRequest(const char *reason, const SBuf &payload) retStr.append(connectHost); retStr.append("\r\n\r\n"); retStr.append(payload); - in.buf = retStr; + inBuf = retStr; bool ret = handleReadData(); if (ret) ret = clientParseRequests(); @@ -4655,21 +4540,6 @@ ConnStateData::transparent() const return clientConnection != NULL && (clientConnection->flags & (COMM_TRANSPARENT|COMM_INTERCEPTION)); } -bool -ConnStateData::reading() const -{ - return reader != NULL; -} - -void -ConnStateData::stopReading() -{ - if (reading()) { - Comm::ReadCancel(clientConnection->fd, reader); - reader = NULL; - } -} - BodyPipe::Pointer ConnStateData::expectRequestBody(int64_t size) { @@ -4691,7 +4561,7 @@ ConnStateData::mayNeedToReadMoreBody() const return -1; // probably need to read more, but we cannot be sure const int64_t needToProduce = bodyPipe->unproducedSize(); - const int64_t haveAvailable = static_cast(in.buf.length()); + const int64_t haveAvailable = static_cast(inBuf.length()); if (needToProduce <= haveAvailable) return 0; // we have read what we need (but are waiting for pipe space) @@ -4734,8 +4604,8 @@ ConnStateData::startDechunkingRequest() { Must(bodyPipe != NULL); debugs(33, 5, HERE << "start dechunking" << bodyPipe->status()); - assert(!in.bodyParser); - in.bodyParser = new Http1::TeChunkedParser; + assert(!bodyParser); + bodyParser = new Http1::TeChunkedParser; } /// put parsed content into input buffer and clean up @@ -4757,18 +4627,8 @@ ConnStateData::finishDechunkingRequest(bool withSuccess) } } - delete in.bodyParser; - in.bodyParser = NULL; -} - -ConnStateData::In::In() : - bodyParser(NULL), - buf() -{} - -ConnStateData::In::~In() -{ - delete bodyParser; // TODO: pool + delete bodyParser; + bodyParser = NULL; } void diff --git a/src/client_side.h b/src/client_side.h index bd58c3359e..743f827550 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -19,6 +19,7 @@ #include "HttpControlMsg.h" #include "ipc/FdNotes.h" #include "SBuf.h" +#include "servers/Server.h" #if USE_AUTH #include "auth/UserRequest.h" #endif @@ -168,17 +169,21 @@ class ServerBump; * * If the above can be confirmed accurate we can call this object PipelineManager or similar */ -class ConnStateData : public BodyProducer, public HttpControlMsgSink, public RegisteredRunner +class ConnStateData : public Server, public HttpControlMsgSink, public RegisteredRunner { public: explicit ConnStateData(const MasterXaction::Pointer &xact); virtual ~ConnStateData(); - void readSomeData(); + /* ::Server API */ + virtual void notifyAllContexts(const int xerrno); + virtual void receivedFirstByte(); + virtual bool handleReadData(); + virtual void afterClientRead(); + bool areAllContextsForThisConnection() const; void freeAllContexts(); - void notifyAllContexts(const int xerrno); ///< tell everybody about the err /// Traffic parsing bool clientParseRequests(); void readNextRequest(); @@ -187,30 +192,10 @@ public: int getConcurrentRequestCount() const; bool isOpen() const; - /// Update flags and timeout after the first byte received - void receivedFirstByte(); - // HttpControlMsgSink API virtual void sendControlMsg(HttpControlMsg msg); - // Client TCP connection details from comm layer. - Comm::ConnectionPointer clientConnection; - - /** - * The transfer protocol currently being spoken on this connection. - * HTTP/1 CONNECT and HTTP/2 SETTINGS offers the ability to change - * protocols on the fly. - */ - AnyP::ProtocolVersion transferProtocol; - - struct In { - In(); - ~In(); - bool maybeMakeSpaceAvailable(); - - Http1::TeChunkedParser *bodyParser; ///< parses chunked request body - SBuf buf; - } in; + Http1::TeChunkedParser *bodyParser; ///< parses HTTP/1.1 chunked request body /** number of body bytes we need to comm_read for the "current" request * @@ -264,12 +249,7 @@ public: AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/ } pinning; - /// Squid listening port details where this connection arrived. - AnyP::PortCfgPointer port; - bool transparent() const; - bool reading() const; - void stopReading(); ///< cancels comm_read if it is scheduled /// true if we stopped receiving the request const char *stoppedReceiving() const { return stoppedReceiving_; } @@ -287,7 +267,6 @@ public: virtual void noteMoreBodySpaceAvailable(BodyPipe::Pointer) = 0; virtual void noteBodyConsumerAborted(BodyPipe::Pointer) = 0; - bool handleReadData(); bool handleRequestBodyData(); /// Forward future client requests using the given server connection. @@ -318,7 +297,6 @@ public: virtual void clientPinnedConnectionClosed(const CommCloseCbParams &io); // comm callbacks - void clientReadRequest(const CommIoCbParams &io); void clientReadFtpData(const CommIoCbParams &io); void connStateClosed(const CommCloseCbParams &io); void requestTimeout(const CommTimeoutCbParams ¶ms); @@ -454,7 +432,9 @@ protected: BodyPipe::Pointer bodyPipe; ///< set when we are reading request body private: - int connFinishedWithConn(int size); + /* ::Server API */ + virtual bool connFinishedWithConn(int size); + void clientAfterReadingRequests(); bool concurrentRequestQueueFilled() const; @@ -495,9 +475,6 @@ private: /// the reason why we no longer read the request or nil const char *stoppedReceiving_; - AsyncCall::Pointer reader; ///< set when we are reading - - bool receivedFirstByte_; ///< true if at least one byte received on this connection SBuf connectionTag_; ///< clt_conn_tag=Tag annotation for client connection }; diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc index 9c60b33775..8e657eda32 100644 --- a/src/servers/FtpServer.cc +++ b/src/servers/FtpServer.cc @@ -640,7 +640,7 @@ Ftp::Server::parseOneRequest() SBuf cmd; SBuf params; - Parser::Tokenizer tok(in.buf); + Parser::Tokenizer tok(inBuf); (void)tok.skipAll(LeadingSpace); // leading OWS and empty commands const bool parsed = tok.prefix(cmd, CommandChars); // required command @@ -667,14 +667,14 @@ Ftp::Server::parseOneRequest() // technically, we may skip multiple NLs below, but that is OK if (!parsed || !tok.skipAll(CharacterSet::LF)) { // did not find terminating LF yet // we need more data, but can we buffer more? - if (in.buf.length() >= Config.maxRequestHeaderSize) { + if (inBuf.length() >= Config.maxRequestHeaderSize) { changeState(fssError, "huge req"); quitAfterError(NULL); return earlyError(EarlyErrorKind::HugeRequest); } else { flags.readMore = true; debugs(33, 5, "Waiting for more, up to " << - (Config.maxRequestHeaderSize - in.buf.length())); + (Config.maxRequestHeaderSize - inBuf.length())); return NULL; } } @@ -1272,7 +1272,7 @@ Ftp::Server::wroteReply(const CommIoCbParams &io) case STREAM_COMPLETE: flags.readMore = true; changeState(fssConnected, "Ftp::Server::wroteReply"); - if (in.bodyParser) + if (bodyParser) finishDechunkingRequest(false); context->keepaliveNextRequest(); return; diff --git a/src/servers/Http1Server.cc b/src/servers/Http1Server.cc index b3fea716e6..d01ba6e7ce 100644 --- a/src/servers/Http1Server.cc +++ b/src/servers/Http1Server.cc @@ -121,7 +121,7 @@ Http::One::Server::buildHttpRequest(ClientSocketContext *context) } // setLogUri should called before repContext->setReplyToError setLogUri(http, http->uri, true); - const char * requestErrorBytes = in.buf.c_str(); + const char * requestErrorBytes = inBuf.c_str(); if (!clientTunnelOnError(this, context, request.getRaw(), parser_->method(), errPage, parser_->parseStatusCode, requestErrorBytes)) { // HttpRequest object not build yet, there is no reason to call // clientProcessRequestFinished method @@ -135,7 +135,7 @@ Http::One::Server::buildHttpRequest(ClientSocketContext *context) // setLogUri should called before repContext->setReplyToError setLogUri(http, http->uri, true); - const char * requestErrorBytes = in.buf.c_str(); + const char * requestErrorBytes = inBuf.c_str(); if (!clientTunnelOnError(this, context, request.getRaw(), parser_->method(), ERR_INVALID_URL, Http::scBadRequest, requestErrorBytes)) { // HttpRequest object not build yet, there is no reason to call // clientProcessRequestFinished method diff --git a/src/servers/Makefile.am b/src/servers/Makefile.am index 179ca9c8a1..ca7c5ea311 100644 --- a/src/servers/Makefile.am +++ b/src/servers/Makefile.am @@ -11,8 +11,10 @@ include $(top_srcdir)/src/TestHeaders.am noinst_LTLIBRARIES = libservers.la libservers_la_SOURCES = \ + forward.h \ FtpServer.cc \ FtpServer.h \ Http1Server.cc \ Http1Server.h \ - forward.h + Server.cc \ + Server.h diff --git a/src/servers/Server.cc b/src/servers/Server.cc new file mode 100644 index 0000000000..f4dec80276 --- /dev/null +++ b/src/servers/Server.cc @@ -0,0 +1,195 @@ +/* + * Copyright (C) 1996-2015 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "anyp/PortCfg.h" +#include "comm.h" +#include "comm/Read.h" +#include "Debug.h" +#include "fd.h" +#include "fde.h" +#include "MasterXaction.h" +#include "servers/Server.h" +#include "SquidConfig.h" +#include "StatCounters.h" +#include "tools.h" + +Server::Server(const MasterXaction::Pointer &xact) : + AsyncJob("::Server"), // kids overwrite + clientConnection(xact->tcpClient), + transferProtocol(xact->squidPort->transport), + port(xact->squidPort), + receivedFirstByte_(false) +{} + +bool +Server::doneAll() const +{ + // servers are not done while the connection is open + return !Comm::IsConnOpen(clientConnection) && + BodyProducer::doneAll(); +} + +void +Server::start() +{ + // TODO: shuffle activity from ConnStateData +} + +void +Server::swanSong() +{ + if (Comm::IsConnOpen(clientConnection)) + clientConnection->close(); + + BodyProducer::swanSong(); +} + +void +Server::stopReading() +{ + if (reading()) { + Comm::ReadCancel(clientConnection->fd, reader); + reader = NULL; + } +} + +bool +Server::maybeMakeSpaceAvailable() +{ + if (inBuf.spaceSize() < 2) { + const SBuf::size_type haveCapacity = inBuf.length() + inBuf.spaceSize(); + if (haveCapacity >= Config.maxRequestBufferSize) { + debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize); + return false; + } + if (haveCapacity == 0) { + // haveCapacity is based on the SBuf visible window of the MemBlob buffer, which may fill up. + // at which point bump the buffer back to default. This allocates a new MemBlob with any un-parsed bytes. + inBuf.reserveCapacity(CLIENT_REQ_BUF_SZ); + } else { + const SBuf::size_type wantCapacity = min(static_cast(Config.maxRequestBufferSize), haveCapacity*2); + inBuf.reserveCapacity(wantCapacity); + } + debugs(33, 2, "growing request buffer: available=" << inBuf.spaceSize() << " used=" << inBuf.length()); + } + return (inBuf.spaceSize() >= 2); +} + +void +Server::readSomeData() +{ + if (reading()) + return; + + debugs(33, 4, clientConnection << ": reading request..."); + + // we can only read if there is more than 1 byte of space free + if (Config.maxRequestBufferSize - inBuf.length() < 2) + return; + + typedef CommCbMemFunT Dialer; + reader = JobCallback(33, 5, Dialer, this, Server::doClientRead); + Comm::Read(clientConnection, reader); +} + +void +Server::doClientRead(const CommIoCbParams &io) +{ + debugs(33,5, io.conn); + Must(reading()); + reader = NULL; + + /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */ + if (io.flag == Comm::ERR_CLOSING) { + debugs(33,5, io.conn << " closing Bailout."); + return; + } + + assert(Comm::IsConnOpen(clientConnection)); + assert(io.conn->fd == clientConnection->fd); + + /* + * Don't reset the timeout value here. The value should be + * counting Config.Timeout.request and applies to the request + * as a whole, not individual read() calls. + * Plus, it breaks our lame *HalfClosed() detection + */ + + maybeMakeSpaceAvailable(); + CommIoCbParams rd(this); // will be expanded with ReadNow results + rd.conn = io.conn; + switch (Comm::ReadNow(rd, inBuf)) { + case Comm::INPROGRESS: + + if (inBuf.isEmpty()) + debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno)); + readSomeData(); + return; + + case Comm::OK: + statCounter.client_http.kbytes_in += rd.size; + if (!receivedFirstByte_) + receivedFirstByte(); + // may comm_close or setReplyToError + if (!handleReadData()) + return; + + /* Continue to process previously read data */ + break; + + case Comm::ENDFILE: // close detected by 0-byte read + debugs(33, 5, io.conn << " closed?"); + + if (connFinishedWithConn(rd.size)) { + clientConnection->close(); + return; + } + + /* It might be half-closed, we can't tell */ + fd_table[io.conn->fd].flags.socket_eof = true; + commMarkHalfClosed(io.conn->fd); + fd_note(io.conn->fd, "half-closed"); + + /* There is one more close check at the end, to detect aborted + * (partial) requests. At this point we can't tell if the request + * is partial. + */ + + /* Continue to process previously read data */ + break; + + // case Comm::COMM_ERROR: + default: // no other flags should ever occur + debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno)); + notifyAllContexts(rd.xerrno); + io.conn->close(); + return; + } + + afterClientRead(); +} + +void +Server::clientWriteDone(const CommIoCbParams &io) +{ + debugs(33,5, io.conn); + Must(writer != NULL); + writer = NULL; + + /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */ + if (io.flag == Comm::ERR_CLOSING) { + debugs(33,5, io.conn << " closing Bailout."); + return; + } + + assert(Comm::IsConnOpen(clientConnection)); + assert(io.conn->fd == clientConnection->fd); + + writeSomeData(); // maybe schedules another write +} diff --git a/src/servers/Server.h b/src/servers/Server.h new file mode 100644 index 0000000000..25c23b5d37 --- /dev/null +++ b/src/servers/Server.h @@ -0,0 +1,107 @@ +/* + * Copyright (C) 1996-2015 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +/* DEBUG: section 33 Client-side Routines */ + +#ifndef SQUID_SERVERS_SERVER_H +#define SQUID_SERVERS_SERVER_H + +#include "anyp/forward.h" +#include "anyp/ProtocolVersion.h" +#include "base/AsyncJob.h" +#include "BodyPipe.h" +#include "comm/forward.h" +#include "CommCalls.h" +#include "SBuf.h" + +/** + * Common base for all Server classes used + * to manage connections from clients. + */ +class Server : virtual public AsyncJob, public BodyProducer +{ +public: + Server(const MasterXaction::Pointer &xact); + virtual ~Server() {} + + /* AsyncJob API */ + virtual void start(); + virtual bool doneAll() const; + virtual void swanSong(); + + /// tell all active contexts on a connection about an error + virtual void notifyAllContexts(const int xerrno) = 0; + + /// ?? + virtual bool connFinishedWithConn(int size) = 0; + + /// processing to be done after a Comm::Read() + virtual void afterClientRead() = 0; + + /// maybe grow the inBuf and schedule Comm::Read() + void readSomeData(); + + /** + * called when new request data has been read from the socket + * + * \retval false called comm_close or setReplyToError (the caller should bail) + * \retval true we did not call comm_close or setReplyToError + */ + virtual bool handleReadData() = 0; + + /// whether Comm::Read() is scheduled + bool reading() const {return reader != NULL;} + + /// cancels Comm::Read() if it is scheduled + void stopReading(); + + /// Update flags and timeout after the first byte received + virtual void receivedFirstByte() = 0; + + /// maybe schedule another Comm::Write() and perform any + /// processing to be done after previous Comm::Write() completes + virtual void writeSomeData() {} + + /// whether Comm::Write() is scheduled + bool writing() const {return writer != NULL;} + +// XXX: should be 'protected:' for child access only, +// but all sorts of code likes to play directly +// with the I/O buffers and socket. +public: + + /// grows the available read buffer space (if possible) + bool maybeMakeSpaceAvailable(); + + // Client TCP connection details from comm layer. + Comm::ConnectionPointer clientConnection; + + /** + * The transfer protocol currently being spoken on this connection. + * HTTP/1.x CONNECT, HTTP/1.1 Upgrade and HTTP/2 SETTINGS offer the + * ability to change protocols on the fly. + */ + AnyP::ProtocolVersion transferProtocol; + + /// Squid listening port details where this connection arrived. + AnyP::PortCfgPointer port; + + /// read I/O buffer for the client connection + SBuf inBuf; + + bool receivedFirstByte_; ///< true if at least one byte received on this connection + +protected: + void doClientRead(const CommIoCbParams &io); + void clientWriteDone(const CommIoCbParams &io); + + AsyncCall::Pointer reader; ///< set when we are reading + AsyncCall::Pointer writer; ///< set when we are writing +}; + +#endif /* SQUID_SERVERS_SERVER_H */ diff --git a/src/stat.cc b/src/stat.cc index 13d18c1105..50fe433c6f 100644 --- a/src/stat.cc +++ b/src/stat.cc @@ -1857,7 +1857,7 @@ statClientRequests(StoreEntry * s) fd_table[fd].bytes_read, fd_table[fd].bytes_written); storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc); storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n", - conn->in.buf.c_str(), (long int) conn->in.buf.length(), (long int) conn->in.buf.spaceSize()); + conn->inBuf.rawContent(), (long int) conn->inBuf.length(), (long int) conn->inBuf.spaceSize()); storeAppendPrintf(s, "\tremote: %s\n", conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN)); storeAppendPrintf(s, "\tlocal: %s\n", diff --git a/src/tests/stub_client_side.cc b/src/tests/stub_client_side.cc index 259d45ae26..513141b0b4 100644 --- a/src/tests/stub_client_side.cc +++ b/src/tests/stub_client_side.cc @@ -36,7 +36,6 @@ void ClientSocketContext::registerWithConn() STUB void ClientSocketContext::noteIoError(const int xerrno) STUB void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB -void ConnStateData::readSomeData() STUB bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false) void ConnStateData::freeAllContexts() STUB void ConnStateData::notifyAllContexts(const int xerrno) STUB @@ -51,8 +50,6 @@ int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0) void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB #endif bool ConnStateData::transparent() const STUB_RETVAL(false) -bool ConnStateData::reading() const STUB_RETVAL(false) -void ConnStateData::stopReading() STUB void ConnStateData::stopReceiving(const char *error) STUB void ConnStateData::stopSending(const char *error) STUB void ConnStateData::expectNoForwarding() STUB @@ -64,7 +61,6 @@ void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, void ConnStateData::unpinConnection(const bool andClose) STUB const Comm::ConnectionPointer ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *peer) STUB_RETVAL(NULL) void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) STUB -void ConnStateData::clientReadRequest(const CommIoCbParams &io) STUB void ConnStateData::connStateClosed(const CommCloseCbParams &io) STUB void ConnStateData::requestTimeout(const CommTimeoutCbParams ¶ms) STUB void ConnStateData::swanSong() STUB @@ -80,8 +76,6 @@ void ConnStateData::buildSslCertGenerationParams(Ssl::CertificateProperties &cer bool ConnStateData::serveDelayedError(ClientSocketContext *context) STUB_RETVAL(false) #endif -bool ConnStateData::In::maybeMakeSpaceAvailable() STUB_RETVAL(false) - void setLogUri(ClientHttpRequest * http, char const *uri, bool cleanUrl) STUB const char *findTrailingHTTPVersion(const char *uriAndHTTPVersion, const char *end) STUB_RETVAL(NULL) int varyEvaluateMatch(StoreEntry * entry, HttpRequest * req) STUB_RETVAL(0) diff --git a/src/tunnel.cc b/src/tunnel.cc index 02551b151e..44d28c5bcf 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -837,11 +837,11 @@ tunnelStartShoveling(TunnelStateData *tunnelState) tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); } - if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->in.buf.isEmpty()) { - struct ConnStateData::In *in = &tunnelState->http->getConn()->in; - debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << in->buf << "\n----------"); - tunnelState->preReadClientData.append(in->buf); - in->buf.consume(); // ConnStateData buffer accounting after the shuffle. + if (tunnelState->http.valid() && tunnelState->http->getConn() && !tunnelState->http->getConn()->inBuf.isEmpty()) { + SBuf * const in = &tunnelState->http->getConn()->inBuf; + debugs(26, DBG_DATA, "Tunnel client PUSH Payload: \n" << *in << "\n----------"); + tunnelState->preReadClientData.append(*in); + in->consume(); // ConnStateData buffer accounting after the shuffle. } tunnelState->copyClientBytes(); }