]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/servers/FtpServer.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / servers / FtpServer.cc
index 10c79b23d078e351c8d90ed707f7a33452820c6c..915d83066f09ff18d127893e70ec0c55b372735c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
@@ -26,6 +26,7 @@
 #include "ftp/Parsing.h"
 #include "globals.h"
 #include "http/one/RequestParser.h"
+#include "http/Stream.h"
 #include "HttpHdrCc.h"
 #include "ip/tools.h"
 #include "ipc/FdNotes.h"
@@ -61,7 +62,9 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact):
     uploadAvailSize(0),
     listener(),
     connector(),
-    reader()
+    reader(),
+    waitingForOrigin(false),
+    originDataDownloadAbortedOnError(false)
 {
     flags.readMore = false; // we need to announce ourselves first
     *uploadBuf = 0;
@@ -124,9 +127,9 @@ void
 Ftp::Server::doProcessRequest()
 {
     // zero pipelinePrefetchMax() ensures that there is only parsed request
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
-    Must(getConcurrentRequestCount() == 1);
+    Must(pipeline.count() == 1);
+    Http::StreamPointer context = pipeline.front();
+    Must(context != nullptr);
 
     ClientHttpRequest *const http = context->http;
     assert(http != NULL);
@@ -149,9 +152,9 @@ Ftp::Server::doProcessRequest()
 }
 
 void
-Ftp::Server::processParsedRequest(ClientSocketContext *context)
+Ftp::Server::processParsedRequest(Http::StreamPointer &)
 {
-    Must(getConcurrentRequestCount() == 1);
+    Must(pipeline.count() == 1);
 
     // Process FTP request asynchronously to make sure FTP
     // data connection accept callback is fired first.
@@ -172,7 +175,7 @@ Ftp::Server::readUploadData(const CommIoCbParams &io)
 
     if (io.flag == Comm::OK && bodyPipe != NULL) {
         if (io.size > 0) {
-            kb_incr(&(statCounter.client_http.kbytes_in), io.size);
+            statCounter.client_http.kbytes_in += io.size;
 
             char *const current_buf = uploadBuf + uploadAvailSize;
             if (io.buf != current_buf)
@@ -217,12 +220,18 @@ Ftp::Server::shovelUploadData()
 void
 Ftp::Server::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
 {
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
     shovelUploadData();
 }
 
 void
 Ftp::Server::noteBodyConsumerAborted(BodyPipe::Pointer ptr)
 {
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
     ConnStateData::noteBodyConsumerAborted(ptr);
     closeDataConnection();
 }
@@ -288,18 +297,14 @@ void
 Ftp::Server::notePeerConnection(Comm::ConnectionPointer conn)
 {
     // find request
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
+    Http::StreamPointer context = pipeline.front();
+    Must(context != nullptr);
     ClientHttpRequest *const http = context->http;
     Must(http != NULL);
     HttpRequest *const request = http->request;
     Must(request != NULL);
-
-    // this is not an idle connection, so we do not want I/O monitoring
-    const bool monitor = false;
-
     // make FTP peer connection exclusive to our request
-    pinConnection(conn, request, conn->getPeer(), false, monitor);
+    pinBusyConnection(conn, request);
 }
 
 void
@@ -307,11 +312,18 @@ Ftp::Server::clientPinnedConnectionClosed(const CommCloseCbParams &io)
 {
     ConnStateData::clientPinnedConnectionClosed(io);
 
-    // if the server control connection is gone, reset state to login again
-    resetLogin("control connection closure");
+    // TODO: Keep the control connection open after fixing the reset
+    // problem below
+    if (Comm::IsConnOpen(clientConnection))
+        clientConnection->close();
 
-    // XXX: Reseting is not enough. FtpRelay::sendCommand() will not re-login
-    // because FtpRelay::serverState() is not going to be fssConnected.
+    // TODO: If the server control connection is gone, reset state to login
+    // again. Reseting login alone is not enough: FtpRelay::sendCommand() will
+    // not re-login because FtpRelay::serverState() is not going to be
+    // fssConnected. Calling resetLogin() alone is also harmful because
+    // it does not reset correctly the client-to-squid control connection (eg
+    // respond if required with an error code, in all cases)
+    // resetLogin("control connection closure");
 }
 
 /// clear client and server login-related state after the old login is gone
@@ -327,16 +339,17 @@ Ftp::Server::resetLogin(const char *reason)
 void
 Ftp::Server::calcUri(const SBuf *file)
 {
+    // TODO: fill a class AnyP::Uri instead of string
     uri = "ftp://";
     uri.append(host);
     if (port->ftp_track_dirs && master->workingDir.length()) {
         if (master->workingDir[0] != '/')
-            uri.append("/");
+            uri.append("/", 1);
         uri.append(master->workingDir);
     }
 
     if (uri[uri.length() - 1] != '/')
-        uri.append("/");
+        uri.append("/", 1);
 
     if (port->ftp_track_dirs && file) {
         static const CharacterSet Slash("/", "/");
@@ -417,7 +430,7 @@ Ftp::Server::acceptDataConnection(const CommAcceptCbParams &params)
                 Must(master->serverState == fssHandleDataRequest);
             MemBuf mb;
             mb.init();
-            mb.Printf("150 Data connection opened.\r\n");
+            mb.appendf("150 Data connection opened.\r\n");
             Comm::Write(clientConnection, &mb, call);
         }
     }
@@ -462,7 +475,7 @@ Ftp::Server::writeEarlyReply(const int code, const char *msg)
 
     MemBuf mb;
     mb.init();
-    mb.Printf("%i %s\r\n", code, msg);
+    mb.appendf("%i %s\r\n", code, msg);
 
     typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteEarlyReply);
@@ -492,17 +505,17 @@ Ftp::Server::writeCustomReply(const int code, const char *msg, const HttpReply *
     assert(99 < code && code < 1000);
 
     const bool sendDetails = reply != NULL &&
-                             reply->header.has(HDR_FTP_STATUS) && reply->header.has(HDR_FTP_REASON);
+                             reply->header.has(Http::HdrType::FTP_STATUS) && reply->header.has(Http::HdrType::FTP_REASON);
 
     MemBuf mb;
     mb.init();
     if (sendDetails) {
-        mb.Printf("%i-%s\r\n", code, msg);
-        mb.Printf(" Server reply:\r\n");
+        mb.appendf("%i-%s\r\n", code, msg);
+        mb.appendf(" Server reply:\r\n");
         Ftp::PrintReply(mb, reply, " ");
-        mb.Printf("%i \r\n", code);
+        mb.appendf("%i \r\n", code);
     } else
-        mb.Printf("%i %s\r\n", code, msg);
+        mb.appendf("%i %s\r\n", code, msg);
 
     writeReply(mb);
 }
@@ -548,7 +561,7 @@ Ftp::CommandHasPathParameter(const SBuf &cmd)
 }
 
 /// creates a context filled with an error message for a given early error
-ClientSocketContext *
+Http::Stream *
 Ftp::Server::earlyError(const EarlyErrorKind eek)
 {
     /* Default values, to be updated by the switch statement below */
@@ -557,43 +570,43 @@ Ftp::Server::earlyError(const EarlyErrorKind eek)
     const char *errUri = "error:ftp-internal-early-error";
 
     switch (eek) {
-    case eekHugeRequest:
+    case EarlyErrorKind::HugeRequest:
         scode = 421;
         reason = "Huge request";
         errUri = "error:ftp-huge-request";
         break;
 
-    case eekMissingLogin:
+    case EarlyErrorKind::MissingLogin:
         scode = 530;
         reason = "Must login first";
         errUri = "error:ftp-must-login-first";
         break;
 
-    case eekMissingUsername:
+    case EarlyErrorKind::MissingUsername:
         scode = 501;
         reason = "Missing username";
         errUri = "error:ftp-missing-username";
         break;
 
-    case eekMissingHost:
+    case EarlyErrorKind::MissingHost:
         scode = 501;
         reason = "Missing host";
         errUri = "error:ftp-missing-host";
         break;
 
-    case eekUnsupportedCommand:
+    case EarlyErrorKind::UnsupportedCommand:
         scode = 502;
         reason = "Unknown or unsupported command";
         errUri = "error:ftp-unsupported-command";
         break;
 
-    case eekInvalidUri:
+    case EarlyErrorKind::InvalidUri:
         scode = 501;
         reason = "Invalid URI";
         errUri = "error:ftp-invalid-uri";
         break;
 
-    case eekMalformedCommand:
+    case EarlyErrorKind::MalformedCommand:
         scode = 421;
         reason = "Malformed command";
         errUri = "error:ftp-malformed-command";
@@ -602,7 +615,7 @@ Ftp::Server::earlyError(const EarlyErrorKind eek)
         // no default so that a compiler can check that we have covered all cases
     }
 
-    ClientSocketContext *context = abortRequestParsing(errUri);
+    Http::Stream *context = abortRequestParsing(errUri);
     clientStreamNode *node = context->getClientReplyContext();
     Must(node);
     clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
@@ -616,9 +629,9 @@ Ftp::Server::earlyError(const EarlyErrorKind eek)
 }
 
 /// Parses a single FTP request on the control connection.
-/// Returns a new ClientSocketContext on valid requests and all errors.
+/// Returns a new Http::Stream on valid requests and all errors.
 /// Returns NULL on incomplete requests that may still succeed given more data.
-ClientSocketContext *
+Http::Stream *
 Ftp::Server::parseOneRequest()
 {
     flags.readMore = false; // common for all but one case below
@@ -640,7 +653,7 @@ Ftp::Server::parseOneRequest()
     SBuf cmd;
     SBuf params;
 
-    Parser::Tokenizer tok(in.buf);
+    Parser::Tokenizer tok(inBuf);
 
     (void)tok.skipAll(LeadingSpace); // leading OWS and empty commands
     const bool parsed = tok.prefix(cmd, CommandChars); // required command
@@ -661,20 +674,20 @@ Ftp::Server::parseOneRequest()
     if (cmd.length() > tokenMax || params.length() > tokenMax) {
         changeState(fssError, "huge req token");
         quitAfterError(NULL);
-        return earlyError(eekHugeRequest);
+        return earlyError(EarlyErrorKind::HugeRequest);
     }
 
     // technically, we may skip multiple NLs below, but that is OK
     if (!parsed || !tok.skipAll(CharacterSet::LF)) { // did not find terminating LF yet
         // we need more data, but can we buffer more?
-        if (in.buf.length() >= Config.maxRequestHeaderSize) {
+        if (inBuf.length() >= Config.maxRequestHeaderSize) {
             changeState(fssError, "huge req");
             quitAfterError(NULL);
-            return earlyError(eekHugeRequest);
+            return earlyError(EarlyErrorKind::HugeRequest);
         } else {
             flags.readMore = true;
             debugs(33, 5, "Waiting for more, up to " <<
-                   (Config.maxRequestHeaderSize - in.buf.length()));
+                   (Config.maxRequestHeaderSize - inBuf.length()));
             return NULL;
         }
     }
@@ -691,18 +704,18 @@ Ftp::Server::parseOneRequest()
         if (!master->clientReadGreeting) {
             // the first command must be USER
             if (!pinning.pinned && cmd != cmdUser())
-                return earlyError(eekMissingLogin);
+                return earlyError(EarlyErrorKind::MissingLogin);
         }
 
         // process USER request now because it sets FTP peer host name
         if (cmd == cmdUser()) {
-            if (ClientSocketContext *errCtx = handleUserRequest(cmd, params))
+            if (Http::Stream *errCtx = handleUserRequest(cmd, params))
                 return errCtx;
         }
     }
 
     if (!Ftp::SupportedCommand(cmd))
-        return earlyError(eekUnsupportedCommand);
+        return earlyError(EarlyErrorKind::UnsupportedCommand);
 
     const HttpRequestMethod method =
         cmd == cmdAppe() || cmd == cmdStor() || cmd == cmdStou() ?
@@ -711,14 +724,15 @@ Ftp::Server::parseOneRequest()
     const SBuf *path = (params.length() && CommandHasPathParameter(cmd)) ?
                        &params : NULL;
     calcUri(path);
-    char *newUri = xstrdup(uri.c_str());
-    HttpRequest *const request = HttpRequest::CreateFromUrlAndMethod(newUri, method);
+    MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initClient);
+    mx->tcpClient = clientConnection;
+    auto * const request = HttpRequest::FromUrl(uri, mx, method);
     if (!request) {
         debugs(33, 5, "Invalid FTP URL: " << uri);
         uri.clear();
-        safe_free(newUri);
-        return earlyError(eekInvalidUri);
+        return earlyError(EarlyErrorKind::InvalidUri);
     }
+    char *newUri = xstrdup(uri.c_str());
 
     request->flags.ftpNative = true;
     request->http_ver = Http::ProtocolVersion(Ftp::ProtocolVersion().major, Ftp::ProtocolVersion().minor);
@@ -727,21 +741,20 @@ Ftp::Server::parseOneRequest()
     request->flags.cachable = false; // XXX: reset later by maybeCacheable()
     request->flags.noCache = true;
 
-    request->header.putStr(HDR_FTP_COMMAND, cmd.c_str());
-    request->header.putStr(HDR_FTP_ARGUMENTS, params.c_str()); // may be ""
+    request->header.putStr(Http::HdrType::FTP_COMMAND, cmd.c_str());
+    request->header.putStr(Http::HdrType::FTP_ARGUMENTS, params.c_str()); // may be ""
     if (method == Http::METHOD_PUT) {
-        request->header.putStr(HDR_EXPECT, "100-continue");
-        request->header.putStr(HDR_TRANSFER_ENCODING, "chunked");
+        request->header.putStr(Http::HdrType::EXPECT, "100-continue");
+        request->header.putStr(Http::HdrType::TRANSFER_ENCODING, "chunked");
     }
 
     ClientHttpRequest *const http = new ClientHttpRequest(this);
-    http->request = request;
-    HTTPMSGLOCK(http->request);
     http->req_sz = tok.parsedSize();
     http->uri = newUri;
+    http->initRequest(request);
 
-    ClientSocketContext *const result =
-        new ClientSocketContext(clientConnection, http);
+    Http::Stream *const result =
+        new Http::Stream(clientConnection, http);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = result->reqbuf;
@@ -761,14 +774,8 @@ void
 Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
 {
     // the caller guarantees that we are dealing with the current context only
-    ClientSocketContext::Pointer context = getCurrentContext();
-    assert(context != NULL);
-
-    if (context->http && context->http->al != NULL &&
-            !context->http->al->reply && reply) {
-        context->http->al->reply = reply;
-        HTTPMSGLOCK(context->http->al->reply);
-    }
+    Http::StreamPointer context = pipeline.front();
+    assert(context != nullptr);
 
     static ReplyHandler handlers[] = {
         NULL, // fssBegin
@@ -785,21 +792,27 @@ Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
         NULL, // fssHandleCdup
         &Ftp::Server::handleErrorReply // fssError
     };
-    const Server &server = dynamic_cast<const Ftp::Server&>(*context->getConn());
-    if (const ReplyHandler handler = handlers[server.master->serverState])
-        (this->*handler)(reply, data);
-    else
-        writeForwardedReply(reply);
+    try {
+        const Server &server = dynamic_cast<const Ftp::Server&>(*context->getConn());
+        if (const ReplyHandler handler = handlers[server.master->serverState])
+            (this->*handler)(reply, data);
+        else
+            writeForwardedReply(reply);
+    } catch (const std::exception &e) {
+        callException(e);
+        throw TexcHere(e.what());
+    }
 }
 
 void
 Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support FEAT", reply);
         return;
     }
 
+    Must(reply);
     HttpReply::Pointer featReply = Ftp::HttpReplyWrapper(211, "End", Http::scNoContent, 0);
     HttpHeader const &serverReplyHeader = reply->header;
 
@@ -808,11 +821,11 @@ Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
     bool hasEPSV = false;
     int prependSpaces = 1;
 
-    featReply->header.putStr(HDR_FTP_PRE, "\"211-Features:\"");
-    const int scode = serverReplyHeader.getInt(HDR_FTP_STATUS);
+    featReply->header.putStr(Http::HdrType::FTP_PRE, "\"211-Features:\"");
+    const int scode = serverReplyHeader.getInt(Http::HdrType::FTP_STATUS);
     if (scode == 211) {
         while (const HttpHeaderEntry *e = serverReplyHeader.getEntry(&pos)) {
-            if (e->id == HDR_FTP_PRE) {
+            if (e->id == Http::HdrType::FTP_PRE) {
                 // assume RFC 2389 FEAT response format, quoted by Squid:
                 // <"> SP NAME [SP PARAMS] <">
                 // but accommodate MS servers sending four SPs before NAME
@@ -848,11 +861,11 @@ Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
     char buf[256];
     if (!hasEPRT) {
         snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPRT");
-        featReply->header.putStr(HDR_FTP_PRE, buf);
+        featReply->header.putStr(Http::HdrType::FTP_PRE, buf);
     }
     if (!hasEPSV) {
         snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPSV");
-        featReply->header.putStr(HDR_FTP_PRE, buf);
+        featReply->header.putStr(Http::HdrType::FTP_PRE, buf);
     }
 
     featReply->header.refreshMask();
@@ -863,8 +876,8 @@ Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
 {
-    ClientSocketContext::Pointer context = getCurrentContext();
-    assert(context != NULL);
+    const Http::StreamPointer context(pipeline.front());
+    assert(context != nullptr);
 
     if (context->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV", reply);
@@ -892,10 +905,10 @@ Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
     // versions block responses that use those alternative syntax rules!
     MemBuf mb;
     mb.init();
-    mb.Printf("227 Entering Passive Mode (%s,%i,%i).\r\n",
-              addr,
-              static_cast<int>(localPort / 256),
-              static_cast<int>(localPort % 256));
+    mb.appendf("227 Entering Passive Mode (%s,%i,%i).\r\n",
+               addr,
+               static_cast<int>(localPort / 256),
+               static_cast<int>(localPort % 256));
     debugs(9, 3, Raw("writing", mb.buf, mb.size));
     writeReply(mb);
 }
@@ -903,7 +916,7 @@ Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::handlePortReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from PORT)", reply);
         return;
     }
@@ -961,7 +974,7 @@ Ftp::Server::handleDataReply(const HttpReply *reply, StoreIOBuffer data)
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteReplyData);
     Comm::Write(dataConn, &mb, call);
 
-    getCurrentContext()->noteSentBodyBytes(data.length);
+    pipeline.front()->noteSentBodyBytes(data.length);
 }
 
 /// called when we are done writing a chunk of the response data
@@ -973,13 +986,12 @@ Ftp::Server::wroteReplyData(const CommIoCbParams &io)
 
     if (io.flag != Comm::OK) {
         debugs(33, 3, "FTP reply data writing failed: " << xstrerr(io.xerrno));
-        closeDataConnection();
-        writeCustomReply(426, "Data connection error; transfer aborted");
+        userDataCompletionCheckpoint(426);
         return;
     }
 
-    assert(getCurrentContext()->http);
-    getCurrentContext()->http->out.size += io.size;
+    assert(pipeline.front()->http);
+    pipeline.front()->http->out.size += io.size;
     replyDataWritingCheckpoint();
 }
 
@@ -987,28 +999,26 @@ Ftp::Server::wroteReplyData(const CommIoCbParams &io)
 void
 Ftp::Server::replyDataWritingCheckpoint()
 {
-    switch (getCurrentContext()->socketState()) {
+    switch (pipeline.front()->socketState()) {
     case STREAM_NONE:
         debugs(33, 3, "Keep going");
-        getCurrentContext()->pullData();
+        pipeline.front()->pullData();
         return;
     case STREAM_COMPLETE:
         debugs(33, 3, "FTP reply data transfer successfully complete");
-        writeCustomReply(226, "Transfer complete");
+        userDataCompletionCheckpoint(226);
         break;
     case STREAM_UNPLANNED_COMPLETE:
         debugs(33, 3, "FTP reply data transfer failed: STREAM_UNPLANNED_COMPLETE");
-        writeCustomReply(451, "Server error; transfer aborted");
+        userDataCompletionCheckpoint(451);
         break;
     case STREAM_FAILED:
+        userDataCompletionCheckpoint(451);
         debugs(33, 3, "FTP reply data transfer failed: STREAM_FAILED");
-        writeCustomReply(451, "Server error; transfer aborted");
         break;
     default:
         fatal("unreachable code");
     }
-
-    closeDataConnection();
 }
 
 void
@@ -1021,10 +1031,17 @@ Ftp::Server::handleUploadReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::writeForwardedReply(const HttpReply *reply)
 {
-    assert(reply != NULL);
+    Must(reply);
+
+    if (waitingForOrigin) {
+        Must(delayedReply == NULL);
+        delayedReply = reply;
+        return;
+    }
+
     const HttpHeader &header = reply->header;
-    // adaptation and forwarding errors lack HDR_FTP_STATUS
-    if (!header.has(HDR_FTP_STATUS)) {
+    // adaptation and forwarding errors lack Http::HdrType::FTP_STATUS
+    if (!header.has(Http::HdrType::FTP_STATUS)) {
         writeForwardedForeign(reply); // will get to Ftp::Server::wroteReply
         return;
     }
@@ -1037,7 +1054,7 @@ Ftp::Server::writeForwardedReply(const HttpReply *reply)
 void
 Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from EPRT)", reply);
         return;
     }
@@ -1050,7 +1067,7 @@ Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Cannot connect to server", reply);
         return;
     }
@@ -1063,7 +1080,7 @@ Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
     // traffic will be redirected to us.
     MemBuf mb;
     mb.init();
-    mb.Printf("229 Entering Extended Passive Mode (|||%u|)\r\n", localPort);
+    mb.appendf("229 Entering Extended Passive Mode (|||%u|)\r\n", localPort);
 
     debugs(9, 3, Raw("writing", mb.buf, mb.size));
     writeReply(mb);
@@ -1073,20 +1090,20 @@ Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
 {
-    const HttpRequest *request = getCurrentContext()->http->request;
+    const HttpRequest *request = pipeline.front()->http->request;
     assert(request);
 
     MemBuf mb;
     mb.init();
 
     if (request->errType != ERR_NONE)
-        mb.Printf("%i-%s\r\n", scode, errorPageName(request->errType));
+        mb.appendf("%i-%s\r\n", scode, errorPageName(request->errType));
 
     if (request->errDetail > 0) {
         // XXX: > 0 may not always mean that this is an errno
-        mb.Printf("%i-Error: (%d) %s\r\n", scode,
-                  request->errDetail,
-                  strerror(request->errDetail));
+        mb.appendf("%i-Error: (%d) %s\r\n", scode,
+                   request->errDetail,
+                   strerror(request->errDetail));
     }
 
 #if USE_ADAPTATION
@@ -1096,18 +1113,20 @@ Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
         const String info = ah->allMeta.getByName("X-Response-Info");
         const String desc = ah->allMeta.getByName("X-Response-Desc");
         if (info.size())
-            mb.Printf("%i-Information: %s\r\n", scode, info.termedBuf());
+            mb.appendf("%i-Information: %s\r\n", scode, info.termedBuf());
         if (desc.size())
-            mb.Printf("%i-Description: %s\r\n", scode, desc.termedBuf());
+            mb.appendf("%i-Description: %s\r\n", scode, desc.termedBuf());
     }
 #endif
 
-    assert(reply != NULL);
-    const char *reason = reply->header.has(HDR_FTP_REASON) ?
-                         reply->header.getStr(HDR_FTP_REASON):
-                         reply->sline.reason();
+    const char *reason = "Lost Error";
+    if (reply) {
+        reason = reply->header.has(Http::HdrType::FTP_REASON) ?
+                 reply->header.getStr(Http::HdrType::FTP_REASON):
+                 reply->sline.reason();
+    }
 
-    mb.Printf("%i %s\r\n", scode, reason); // error terminating line
+    mb.appendf("%i %s\r\n", scode, reason); // error terminating line
 
     // TODO: errorpage.cc should detect FTP client and use
     // configurable FTP-friendly error templates which we should
@@ -1127,12 +1146,13 @@ Ftp::Server::writeForwardedForeign(const HttpReply *reply)
     writeErrorReply(reply, 451);
 }
 
-void
-Ftp::Server::writeControlMsgAndCall(ClientSocketContext *context, HttpReply *reply, AsyncCall::Pointer &call)
+bool
+Ftp::Server::writeControlMsgAndCall(HttpReply *reply, AsyncCall::Pointer &call)
 {
     // the caller guarantees that we are dealing with the current context only
-    // the caller should also make sure reply->header.has(HDR_FTP_STATUS)
+    // the caller should also make sure reply->header.has(Http::HdrType::FTP_STATUS)
     writeForwardedReplyAndCall(reply, call);
+    return true;
 }
 
 void
@@ -1142,9 +1162,9 @@ Ftp::Server::writeForwardedReplyAndCall(const HttpReply *reply, AsyncCall::Point
     const HttpHeader &header = reply->header;
 
     // without status, the caller must use the writeForwardedForeign() path
-    Must(header.has(HDR_FTP_STATUS));
-    Must(header.has(HDR_FTP_REASON));
-    const int scode = header.getInt(HDR_FTP_STATUS);
+    Must(header.has(Http::HdrType::FTP_STATUS));
+    Must(header.has(Http::HdrType::FTP_REASON));
+    const int scode = header.getInt(Http::HdrType::FTP_STATUS);
     debugs(33, 7, "scode: " << scode);
 
     // Status 125 or 150 implies upload or data request, but we still check
@@ -1188,23 +1208,23 @@ Ftp::Server::writeForwardedReplyAndCall(const HttpReply *reply, AsyncCall::Point
 }
 
 static void
-Ftp::PrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix)
+Ftp::PrintReply(MemBuf &mb, const HttpReply *reply, const char *const)
 {
     const HttpHeader &header = reply->header;
 
     HttpHeaderPos pos = HttpHeaderInitPos;
     while (const HttpHeaderEntry *e = header.getEntry(&pos)) {
-        if (e->id == HDR_FTP_PRE) {
+        if (e->id == Http::HdrType::FTP_PRE) {
             String raw;
             if (httpHeaderParseQuotedString(e->value.rawBuf(), e->value.size(), &raw))
-                mb.Printf("%s\r\n", raw.termedBuf());
+                mb.appendf("%s\r\n", raw.termedBuf());
         }
     }
 
-    if (header.has(HDR_FTP_STATUS)) {
-        const char *reason = header.getStr(HDR_FTP_REASON);
-        mb.Printf("%i %s\r\n", header.getInt(HDR_FTP_STATUS),
-                  (reason ? reason : 0));
+    if (header.has(Http::HdrType::FTP_STATUS)) {
+        const char *reason = header.getStr(Http::HdrType::FTP_REASON);
+        mb.appendf("%i %s\r\n", header.getInt(Http::HdrType::FTP_STATUS),
+                   (reason ? reason : 0));
     }
 }
 
@@ -1220,8 +1240,8 @@ Ftp::Server::wroteEarlyReply(const CommIoCbParams &io)
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
-    if (context != NULL && context->http) {
+    Http::StreamPointer context = pipeline.front();
+    if (context != nullptr && context->http) {
         context->http->out.size += io.size;
         context->http->out.headers_sz += io.size;
     }
@@ -1242,7 +1262,7 @@ Ftp::Server::wroteReply(const CommIoCbParams &io)
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
+    Http::StreamPointer context = pipeline.front();
     assert(context->http);
     context->http->out.size += io.size;
     context->http->out.headers_sz += io.size;
@@ -1265,9 +1285,10 @@ Ftp::Server::wroteReply(const CommIoCbParams &io)
     case STREAM_COMPLETE:
         flags.readMore = true;
         changeState(fssConnected, "Ftp::Server::wroteReply");
-        if (in.bodyParser)
+        if (bodyParser)
             finishDechunkingRequest(false);
-        context->keepaliveNextRequest();
+        context->finished();
+        kick();
         return;
     }
 }
@@ -1279,18 +1300,15 @@ Ftp::Server::handleRequest(HttpRequest *request)
     Must(request);
 
     HttpHeader &header = request->header;
-    Must(header.has(HDR_FTP_COMMAND));
-    String &cmd = header.findEntry(HDR_FTP_COMMAND)->value;
-    Must(header.has(HDR_FTP_ARGUMENTS));
-    String &params = header.findEntry(HDR_FTP_ARGUMENTS)->value;
+    Must(header.has(Http::HdrType::FTP_COMMAND));
+    String &cmd = header.findEntry(Http::HdrType::FTP_COMMAND)->value;
+    Must(header.has(Http::HdrType::FTP_ARGUMENTS));
+    String &params = header.findEntry(Http::HdrType::FTP_ARGUMENTS)->value;
 
-    if (do_debug(9, 2)) {
+    if (Debug::Enabled(9, 2)) {
         MemBuf mb;
-        Packer p;
         mb.init();
-        packerToMemInit(&p, &mb);
-        request->pack(&p);
-        packerClean(&p);
+        request->pack(&mb);
 
         debugs(9, 2, "FTP Client " << clientConnection);
         debugs(9, 2, "FTP Client REQUEST:\n---------\n" << mb.buf <<
@@ -1334,16 +1352,16 @@ Ftp::Server::handleRequest(HttpRequest *request)
 
 /// Called to parse USER command, which is required to create an HTTP request
 /// wrapper. W/o request, the errors are handled by returning earlyError().
-ClientSocketContext *
-Ftp::Server::handleUserRequest(const SBuf &cmd, SBuf &params)
+Http::Stream *
+Ftp::Server::handleUserRequest(const SBuf &, SBuf &params)
 {
     if (params.isEmpty())
-        return earlyError(eekMissingUsername);
+        return earlyError(EarlyErrorKind::MissingUsername);
 
     // find the [end of] user name
     const SBuf::size_type eou = params.rfind('@');
     if (eou == SBuf::npos || eou + 1 >= params.length())
-        return earlyError(eekMissingHost);
+        return earlyError(EarlyErrorKind::MissingHost);
 
     // Determine the intended destination.
     host = params.substr(eou + 1, params.length());
@@ -1383,14 +1401,14 @@ Ftp::Server::handleUserRequest(const SBuf &cmd, SBuf &params)
 }
 
 bool
-Ftp::Server::handleFeatRequest(String &cmd, String &params)
+Ftp::Server::handleFeatRequest(String &, String &)
 {
     changeState(fssHandleFeat, "handleFeatRequest");
     return true;
 }
 
 bool
-Ftp::Server::handlePasvRequest(String &cmd, String &params)
+Ftp::Server::handlePasvRequest(String &, String &params)
 {
     if (gotEpsvAll) {
         setReply(500, "Bad PASV command");
@@ -1428,9 +1446,33 @@ Ftp::Server::createDataConnection(Ip::Address cltAddr)
     Comm::ConnectionPointer conn = new Comm::Connection();
     conn->flags |= COMM_DOBIND;
 
-    // Use local IP address of the control connection as the source address
-    // of the active data connection, or some clients will refuse to accept.
-    conn->setAddrs(clientConnection->local, cltAddr);
+    if (clientConnection->flags & COMM_INTERCEPTION) {
+        // In the case of NAT interception conn->local value is not set
+        // because the TCP stack will automatically pick correct source
+        // address for the data connection. We must only ensure that IP
+        // version matches client's address.
+        conn->local.setAnyAddr();
+
+        if (cltAddr.isIPv4())
+            conn->local.setIPv4();
+
+        conn->remote = cltAddr;
+    } else {
+        // In the case of explicit-proxy the local IP of the control connection
+        // is the Squid IP the client is knowingly talking to.
+        //
+        // In the case of TPROXY the IP address of the control connection is
+        // server IP the client is connecting to, it can be spoofed by Squid.
+        //
+        // In both cases some clients may refuse to accept data connections if
+        // these control connectin local-IP's are not used.
+        conn->setAddrs(clientConnection->local, cltAddr);
+
+        // Using non-local addresses in TPROXY mode requires appropriate socket option.
+        if (clientConnection->flags & COMM_TRANSPARENT)
+            conn->flags |= COMM_TRANSPARENT;
+    }
+
     // RFC 959 requires active FTP connections to originate from port 20
     // but that would preclude us from supporting concurrent transfers! (XXX?)
     conn->local.port(0);
@@ -1444,7 +1486,7 @@ Ftp::Server::createDataConnection(Ip::Address cltAddr)
 }
 
 bool
-Ftp::Server::handlePortRequest(String &cmd, String &params)
+Ftp::Server::handlePortRequest(String &, String &params)
 {
     // TODO: Should PORT errors trigger closeDataConnection() cleanup?
 
@@ -1473,27 +1515,32 @@ Ftp::Server::handlePortRequest(String &cmd, String &params)
 }
 
 bool
-Ftp::Server::handleDataRequest(String &cmd, String &params)
+Ftp::Server::handleDataRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
+    master->userDataDone = 0;
+    originDataDownloadAbortedOnError = false;
+
     changeState(fssHandleDataRequest, "handleDataRequest");
 
     return true;
 }
 
 bool
-Ftp::Server::handleUploadRequest(String &cmd, String &params)
+Ftp::Server::handleUploadRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
     if (Config.accessList.forceRequestBodyContinuation) {
-        ClientHttpRequest *http = getCurrentContext()->http;
+        ClientHttpRequest *http = pipeline.front()->http;
         HttpRequest *request = http->request;
         ACLFilledChecklist bodyContinuationCheck(Config.accessList.forceRequestBodyContinuation, request, NULL);
-        if (bodyContinuationCheck.fastCheck() == ACCESS_ALLOWED) {
+        bodyContinuationCheck.al = http->al;
+        bodyContinuationCheck.syncAle(request, http->log_uri);
+        if (bodyContinuationCheck.fastCheck().allowed()) {
             request->forcedBodyContinuation = true;
             if (checkDataConnPost()) {
                 // Write control Msg
@@ -1515,7 +1562,7 @@ Ftp::Server::handleUploadRequest(String &cmd, String &params)
 }
 
 bool
-Ftp::Server::handleEprtRequest(String &cmd, String &params)
+Ftp::Server::handleEprtRequest(String &, String &params)
 {
     debugs(9, 3, "Process an EPRT " << params);
 
@@ -1544,7 +1591,7 @@ Ftp::Server::handleEprtRequest(String &cmd, String &params)
 }
 
 bool
-Ftp::Server::handleEpsvRequest(String &cmd, String &params)
+Ftp::Server::handleEpsvRequest(String &, String &params)
 {
     debugs(9, 3, "Process an EPSV command with params: " << params);
     if (params.size() <= 0) {
@@ -1569,21 +1616,21 @@ Ftp::Server::handleEpsvRequest(String &cmd, String &params)
 }
 
 bool
-Ftp::Server::handleCwdRequest(String &cmd, String &params)
+Ftp::Server::handleCwdRequest(String &, String &)
 {
     changeState(fssHandleCwd, "handleCwdRequest");
     return true;
 }
 
 bool
-Ftp::Server::handlePassRequest(String &cmd, String &params)
+Ftp::Server::handlePassRequest(String &, String &)
 {
     changeState(fssHandlePass, "handlePassRequest");
     return true;
 }
 
 bool
-Ftp::Server::handleCdupRequest(String &cmd, String &params)
+Ftp::Server::handleCdupRequest(String &, String &)
 {
     changeState(fssHandleCdup, "handleCdupRequest");
     return true;
@@ -1594,15 +1641,15 @@ Ftp::Server::handleCdupRequest(String &cmd, String &params)
 void
 Ftp::Server::setDataCommand()
 {
-    ClientHttpRequest *const http = getCurrentContext()->http;
+    ClientHttpRequest *const http = pipeline.front()->http;
     assert(http != NULL);
     HttpRequest *const request = http->request;
     assert(request != NULL);
     HttpHeader &header = request->header;
-    header.delById(HDR_FTP_COMMAND);
-    header.putStr(HDR_FTP_COMMAND, "PASV");
-    header.delById(HDR_FTP_ARGUMENTS);
-    header.putStr(HDR_FTP_ARGUMENTS, "");
+    header.delById(Http::HdrType::FTP_COMMAND);
+    header.putStr(Http::HdrType::FTP_COMMAND, "PASV");
+    header.delById(Http::HdrType::FTP_ARGUMENTS);
+    header.putStr(Http::HdrType::FTP_ARGUMENTS, "");
     debugs(9, 5, "client data command converted to fake PASV");
 }
 
@@ -1660,7 +1707,7 @@ Ftp::Server::connectedForData(const CommConnectCbParams &params)
         if (params.conn != NULL)
             params.conn->close();
         setReply(425, "Cannot open data connection.");
-        ClientSocketContext::Pointer context = getCurrentContext();
+        Http::StreamPointer context = pipeline.front();
         Must(context->http);
         Must(context->http->storeEntry() != NULL);
     } else {
@@ -1675,15 +1722,13 @@ Ftp::Server::connectedForData(const CommConnectCbParams &params)
 void
 Ftp::Server::setReply(const int code, const char *msg)
 {
-    ClientSocketContext::Pointer context = getCurrentContext();
+    Http::StreamPointer context = pipeline.front();
     ClientHttpRequest *const http = context->http;
     assert(http != NULL);
     assert(http->storeEntry() == NULL);
 
     HttpReply *const reply = Ftp::HttpReplyWrapper(code, msg, Http::scNoContent, 0);
 
-    setLogUri(http, urlCanonicalClean(http->request));
-
     clientStreamNode *const node = context->getClientReplyContext();
     clientReplyContext *const repContext =
         dynamic_cast<clientReplyContext *>(node->data.getRaw());
@@ -1696,6 +1741,97 @@ Ftp::Server::setReply(const int code, const char *msg)
     http->storeEntry()->replaceHttpReply(reply);
 }
 
+void
+Ftp::Server::callException(const std::exception &e)
+{
+    debugs(33, 2, "FTP::Server job caught: " << e.what());
+    closeDataConnection();
+    unpinConnection(true);
+    if (Comm::IsConnOpen(clientConnection))
+        clientConnection->close();
+    AsyncJob::callException(e);
+}
+
+void
+Ftp::Server::startWaitingForOrigin()
+{
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
+    debugs(33, 5, "waiting for Ftp::Client data transfer to end");
+    waitingForOrigin = true;
+}
+
+void
+Ftp::Server::stopWaitingForOrigin(int originStatus)
+{
+    Must(waitingForOrigin);
+    waitingForOrigin = false;
+
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
+    // if we have already decided how to respond, respond now
+    if (delayedReply) {
+        HttpReply::Pointer reply = delayedReply;
+        delayedReply = nullptr;
+        writeForwardedReply(reply.getRaw());
+        return; // do not completeDataDownload() after an earlier response
+    }
+
+    if (master->serverState != fssHandleDataRequest)
+        return;
+
+    // completeDataDownload() could be waitingForOrigin in fssHandleDataRequest
+    // Depending on which side has finished downloading first, either trust
+    // master->userDataDone status or set originDataDownloadAbortedOnError:
+    if (master->userDataDone) {
+        // We finished downloading before Ftp::Client. Most likely, the
+        // adaptation shortened the origin response or we hit an error.
+        // Our status (stored in master->userDataDone) is more informative.
+        // Use master->userDataDone; avoid originDataDownloadAbortedOnError.
+        completeDataDownload();
+    } else {
+        debugs(33, 5, "too early to write the response");
+        // Ftp::Client naturally finished downloading before us. Set
+        // originDataDownloadAbortedOnError to overwrite future
+        // master->userDataDone and relay Ftp::Client error, if there was
+        // any, to the user.
+        originDataDownloadAbortedOnError = (originStatus >= 400);
+    }
+}
+
+void Ftp::Server::userDataCompletionCheckpoint(int finalStatusCode)
+{
+    Must(!master->userDataDone);
+    master->userDataDone = finalStatusCode;
+
+    if (bodyParser)
+        finishDechunkingRequest(false);
+
+    if (waitingForOrigin) {
+        // The completeDataDownload() is not called here unconditionally
+        // because we want to signal the FTP user that we are not fully
+        // done processing its data stream, even though all data bytes
+        // have been sent or received already.
+        debugs(33, 5, "Transferring from FTP server is not complete");
+        return;
+    }
+
+    // Adjust our reply if the server aborted with an error before we are done.
+    if (master->userDataDone == 226 && originDataDownloadAbortedOnError) {
+        debugs(33, 5, "Transferring from FTP server terminated with an error, adjust status code");
+        master->userDataDone = 451;
+    }
+    completeDataDownload();
+}
+
+void Ftp::Server::completeDataDownload()
+{
+    writeCustomReply(master->userDataDone, master->userDataDone == 226 ? "Transfer complete" : "Server error; transfer aborted");
+    closeDataConnection();
+}
+
 /// Whether Squid FTP Relay supports a named feature (e.g., a command).
 static bool
 Ftp::SupportedCommand(const SBuf &name)