]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/servers/FtpServer.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / servers / FtpServer.cc
index 8ce0bd103386f0c61e65b28e9e4ed267480e5fdf..915d83066f09ff18d127893e70ec0c55b372735c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
@@ -26,6 +26,7 @@
 #include "ftp/Parsing.h"
 #include "globals.h"
 #include "http/one/RequestParser.h"
+#include "http/Stream.h"
 #include "HttpHdrCc.h"
 #include "ip/tools.h"
 #include "ipc/FdNotes.h"
@@ -61,7 +62,9 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact):
     uploadAvailSize(0),
     listener(),
     connector(),
-    reader()
+    reader(),
+    waitingForOrigin(false),
+    originDataDownloadAbortedOnError(false)
 {
     flags.readMore = false; // we need to announce ourselves first
     *uploadBuf = 0;
@@ -124,9 +127,9 @@ void
 Ftp::Server::doProcessRequest()
 {
     // zero pipelinePrefetchMax() ensures that there is only parsed request
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
-    Must(getConcurrentRequestCount() == 1);
+    Must(pipeline.count() == 1);
+    Http::StreamPointer context = pipeline.front();
+    Must(context != nullptr);
 
     ClientHttpRequest *const http = context->http;
     assert(http != NULL);
@@ -149,9 +152,9 @@ Ftp::Server::doProcessRequest()
 }
 
 void
-Ftp::Server::processParsedRequest(ClientSocketContext *)
+Ftp::Server::processParsedRequest(Http::StreamPointer &)
 {
-    Must(getConcurrentRequestCount() == 1);
+    Must(pipeline.count() == 1);
 
     // Process FTP request asynchronously to make sure FTP
     // data connection accept callback is fired first.
@@ -172,7 +175,7 @@ Ftp::Server::readUploadData(const CommIoCbParams &io)
 
     if (io.flag == Comm::OK && bodyPipe != NULL) {
         if (io.size > 0) {
-            kb_incr(&(statCounter.client_http.kbytes_in), io.size);
+            statCounter.client_http.kbytes_in += io.size;
 
             char *const current_buf = uploadBuf + uploadAvailSize;
             if (io.buf != current_buf)
@@ -217,12 +220,18 @@ Ftp::Server::shovelUploadData()
 void
 Ftp::Server::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
 {
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
     shovelUploadData();
 }
 
 void
 Ftp::Server::noteBodyConsumerAborted(BodyPipe::Pointer ptr)
 {
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
     ConnStateData::noteBodyConsumerAborted(ptr);
     closeDataConnection();
 }
@@ -288,18 +297,14 @@ void
 Ftp::Server::notePeerConnection(Comm::ConnectionPointer conn)
 {
     // find request
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
+    Http::StreamPointer context = pipeline.front();
+    Must(context != nullptr);
     ClientHttpRequest *const http = context->http;
     Must(http != NULL);
     HttpRequest *const request = http->request;
     Must(request != NULL);
-
-    // this is not an idle connection, so we do not want I/O monitoring
-    const bool monitor = false;
-
     // make FTP peer connection exclusive to our request
-    pinConnection(conn, request, conn->getPeer(), false, monitor);
+    pinBusyConnection(conn, request);
 }
 
 void
@@ -307,11 +312,18 @@ Ftp::Server::clientPinnedConnectionClosed(const CommCloseCbParams &io)
 {
     ConnStateData::clientPinnedConnectionClosed(io);
 
-    // if the server control connection is gone, reset state to login again
-    resetLogin("control connection closure");
+    // TODO: Keep the control connection open after fixing the reset
+    // problem below
+    if (Comm::IsConnOpen(clientConnection))
+        clientConnection->close();
 
-    // XXX: Reseting is not enough. FtpRelay::sendCommand() will not re-login
-    // because FtpRelay::serverState() is not going to be fssConnected.
+    // TODO: If the server control connection is gone, reset state to login
+    // again. Reseting login alone is not enough: FtpRelay::sendCommand() will
+    // not re-login because FtpRelay::serverState() is not going to be
+    // fssConnected. Calling resetLogin() alone is also harmful because
+    // it does not reset correctly the client-to-squid control connection (eg
+    // respond if required with an error code, in all cases)
+    // resetLogin("control connection closure");
 }
 
 /// clear client and server login-related state after the old login is gone
@@ -327,6 +339,7 @@ Ftp::Server::resetLogin(const char *reason)
 void
 Ftp::Server::calcUri(const SBuf *file)
 {
+    // TODO: fill a class AnyP::Uri instead of string
     uri = "ftp://";
     uri.append(host);
     if (port->ftp_track_dirs && master->workingDir.length()) {
@@ -548,7 +561,7 @@ Ftp::CommandHasPathParameter(const SBuf &cmd)
 }
 
 /// creates a context filled with an error message for a given early error
-ClientSocketContext *
+Http::Stream *
 Ftp::Server::earlyError(const EarlyErrorKind eek)
 {
     /* Default values, to be updated by the switch statement below */
@@ -602,7 +615,7 @@ Ftp::Server::earlyError(const EarlyErrorKind eek)
         // no default so that a compiler can check that we have covered all cases
     }
 
-    ClientSocketContext *context = abortRequestParsing(errUri);
+    Http::Stream *context = abortRequestParsing(errUri);
     clientStreamNode *node = context->getClientReplyContext();
     Must(node);
     clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
@@ -616,9 +629,9 @@ Ftp::Server::earlyError(const EarlyErrorKind eek)
 }
 
 /// Parses a single FTP request on the control connection.
-/// Returns a new ClientSocketContext on valid requests and all errors.
+/// Returns a new Http::Stream on valid requests and all errors.
 /// Returns NULL on incomplete requests that may still succeed given more data.
-ClientSocketContext *
+Http::Stream *
 Ftp::Server::parseOneRequest()
 {
     flags.readMore = false; // common for all but one case below
@@ -640,7 +653,7 @@ Ftp::Server::parseOneRequest()
     SBuf cmd;
     SBuf params;
 
-    Parser::Tokenizer tok(in.buf);
+    Parser::Tokenizer tok(inBuf);
 
     (void)tok.skipAll(LeadingSpace); // leading OWS and empty commands
     const bool parsed = tok.prefix(cmd, CommandChars); // required command
@@ -667,14 +680,14 @@ Ftp::Server::parseOneRequest()
     // technically, we may skip multiple NLs below, but that is OK
     if (!parsed || !tok.skipAll(CharacterSet::LF)) { // did not find terminating LF yet
         // we need more data, but can we buffer more?
-        if (in.buf.length() >= Config.maxRequestHeaderSize) {
+        if (inBuf.length() >= Config.maxRequestHeaderSize) {
             changeState(fssError, "huge req");
             quitAfterError(NULL);
             return earlyError(EarlyErrorKind::HugeRequest);
         } else {
             flags.readMore = true;
             debugs(33, 5, "Waiting for more, up to " <<
-                   (Config.maxRequestHeaderSize - in.buf.length()));
+                   (Config.maxRequestHeaderSize - inBuf.length()));
             return NULL;
         }
     }
@@ -696,7 +709,7 @@ Ftp::Server::parseOneRequest()
 
         // process USER request now because it sets FTP peer host name
         if (cmd == cmdUser()) {
-            if (ClientSocketContext *errCtx = handleUserRequest(cmd, params))
+            if (Http::Stream *errCtx = handleUserRequest(cmd, params))
                 return errCtx;
         }
     }
@@ -711,14 +724,15 @@ Ftp::Server::parseOneRequest()
     const SBuf *path = (params.length() && CommandHasPathParameter(cmd)) ?
                        &params : NULL;
     calcUri(path);
-    char *newUri = xstrdup(uri.c_str());
-    HttpRequest *const request = HttpRequest::CreateFromUrlAndMethod(newUri, method);
+    MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initClient);
+    mx->tcpClient = clientConnection;
+    auto * const request = HttpRequest::FromUrl(uri, mx, method);
     if (!request) {
         debugs(33, 5, "Invalid FTP URL: " << uri);
         uri.clear();
-        safe_free(newUri);
         return earlyError(EarlyErrorKind::InvalidUri);
     }
+    char *newUri = xstrdup(uri.c_str());
 
     request->flags.ftpNative = true;
     request->http_ver = Http::ProtocolVersion(Ftp::ProtocolVersion().major, Ftp::ProtocolVersion().minor);
@@ -735,13 +749,12 @@ Ftp::Server::parseOneRequest()
     }
 
     ClientHttpRequest *const http = new ClientHttpRequest(this);
-    http->request = request;
-    HTTPMSGLOCK(http->request);
     http->req_sz = tok.parsedSize();
     http->uri = newUri;
+    http->initRequest(request);
 
-    ClientSocketContext *const result =
-        new ClientSocketContext(clientConnection, http);
+    Http::Stream *const result =
+        new Http::Stream(clientConnection, http);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = result->reqbuf;
@@ -761,14 +774,8 @@ void
 Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
 {
     // the caller guarantees that we are dealing with the current context only
-    ClientSocketContext::Pointer context = getCurrentContext();
-    assert(context != NULL);
-
-    if (context->http && context->http->al != NULL &&
-            !context->http->al->reply && reply) {
-        context->http->al->reply = reply;
-        HTTPMSGLOCK(context->http->al->reply);
-    }
+    Http::StreamPointer context = pipeline.front();
+    assert(context != nullptr);
 
     static ReplyHandler handlers[] = {
         NULL, // fssBegin
@@ -800,7 +807,7 @@ Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
 void
 Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support FEAT", reply);
         return;
     }
@@ -869,8 +876,8 @@ Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
 {
-    ClientSocketContext::Pointer context = getCurrentContext();
-    assert(context != NULL);
+    const Http::StreamPointer context(pipeline.front());
+    assert(context != nullptr);
 
     if (context->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV", reply);
@@ -909,7 +916,7 @@ Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::handlePortReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from PORT)", reply);
         return;
     }
@@ -967,7 +974,7 @@ Ftp::Server::handleDataReply(const HttpReply *reply, StoreIOBuffer data)
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteReplyData);
     Comm::Write(dataConn, &mb, call);
 
-    getCurrentContext()->noteSentBodyBytes(data.length);
+    pipeline.front()->noteSentBodyBytes(data.length);
 }
 
 /// called when we are done writing a chunk of the response data
@@ -979,13 +986,12 @@ Ftp::Server::wroteReplyData(const CommIoCbParams &io)
 
     if (io.flag != Comm::OK) {
         debugs(33, 3, "FTP reply data writing failed: " << xstrerr(io.xerrno));
-        closeDataConnection();
-        writeCustomReply(426, "Data connection error; transfer aborted");
+        userDataCompletionCheckpoint(426);
         return;
     }
 
-    assert(getCurrentContext()->http);
-    getCurrentContext()->http->out.size += io.size;
+    assert(pipeline.front()->http);
+    pipeline.front()->http->out.size += io.size;
     replyDataWritingCheckpoint();
 }
 
@@ -993,28 +999,26 @@ Ftp::Server::wroteReplyData(const CommIoCbParams &io)
 void
 Ftp::Server::replyDataWritingCheckpoint()
 {
-    switch (getCurrentContext()->socketState()) {
+    switch (pipeline.front()->socketState()) {
     case STREAM_NONE:
         debugs(33, 3, "Keep going");
-        getCurrentContext()->pullData();
+        pipeline.front()->pullData();
         return;
     case STREAM_COMPLETE:
         debugs(33, 3, "FTP reply data transfer successfully complete");
-        writeCustomReply(226, "Transfer complete");
+        userDataCompletionCheckpoint(226);
         break;
     case STREAM_UNPLANNED_COMPLETE:
         debugs(33, 3, "FTP reply data transfer failed: STREAM_UNPLANNED_COMPLETE");
-        writeCustomReply(451, "Server error; transfer aborted");
+        userDataCompletionCheckpoint(451);
         break;
     case STREAM_FAILED:
+        userDataCompletionCheckpoint(451);
         debugs(33, 3, "FTP reply data transfer failed: STREAM_FAILED");
-        writeCustomReply(451, "Server error; transfer aborted");
         break;
     default:
         fatal("unreachable code");
     }
-
-    closeDataConnection();
 }
 
 void
@@ -1029,6 +1033,12 @@ Ftp::Server::writeForwardedReply(const HttpReply *reply)
 {
     Must(reply);
 
+    if (waitingForOrigin) {
+        Must(delayedReply == NULL);
+        delayedReply = reply;
+        return;
+    }
+
     const HttpHeader &header = reply->header;
     // adaptation and forwarding errors lack Http::HdrType::FTP_STATUS
     if (!header.has(Http::HdrType::FTP_STATUS)) {
@@ -1044,7 +1054,7 @@ Ftp::Server::writeForwardedReply(const HttpReply *reply)
 void
 Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from EPRT)", reply);
         return;
     }
@@ -1057,7 +1067,7 @@ Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Cannot connect to server", reply);
         return;
     }
@@ -1080,7 +1090,7 @@ Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
 void
 Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
 {
-    const HttpRequest *request = getCurrentContext()->http->request;
+    const HttpRequest *request = pipeline.front()->http->request;
     assert(request);
 
     MemBuf mb;
@@ -1109,10 +1119,12 @@ Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
     }
 #endif
 
-    Must(reply);
-    const char *reason = reply->header.has(Http::HdrType::FTP_REASON) ?
-                         reply->header.getStr(Http::HdrType::FTP_REASON):
-                         reply->sline.reason();
+    const char *reason = "Lost Error";
+    if (reply) {
+        reason = reply->header.has(Http::HdrType::FTP_REASON) ?
+                 reply->header.getStr(Http::HdrType::FTP_REASON):
+                 reply->sline.reason();
+    }
 
     mb.appendf("%i %s\r\n", scode, reason); // error terminating line
 
@@ -1134,12 +1146,13 @@ Ftp::Server::writeForwardedForeign(const HttpReply *reply)
     writeErrorReply(reply, 451);
 }
 
-void
-Ftp::Server::writeControlMsgAndCall(ClientSocketContext *, HttpReply *reply, AsyncCall::Pointer &call)
+bool
+Ftp::Server::writeControlMsgAndCall(HttpReply *reply, AsyncCall::Pointer &call)
 {
     // the caller guarantees that we are dealing with the current context only
     // the caller should also make sure reply->header.has(Http::HdrType::FTP_STATUS)
     writeForwardedReplyAndCall(reply, call);
+    return true;
 }
 
 void
@@ -1227,8 +1240,8 @@ Ftp::Server::wroteEarlyReply(const CommIoCbParams &io)
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
-    if (context != NULL && context->http) {
+    Http::StreamPointer context = pipeline.front();
+    if (context != nullptr && context->http) {
         context->http->out.size += io.size;
         context->http->out.headers_sz += io.size;
     }
@@ -1249,7 +1262,7 @@ Ftp::Server::wroteReply(const CommIoCbParams &io)
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
+    Http::StreamPointer context = pipeline.front();
     assert(context->http);
     context->http->out.size += io.size;
     context->http->out.headers_sz += io.size;
@@ -1272,9 +1285,10 @@ Ftp::Server::wroteReply(const CommIoCbParams &io)
     case STREAM_COMPLETE:
         flags.readMore = true;
         changeState(fssConnected, "Ftp::Server::wroteReply");
-        if (in.bodyParser)
+        if (bodyParser)
             finishDechunkingRequest(false);
-        context->keepaliveNextRequest();
+        context->finished();
+        kick();
         return;
     }
 }
@@ -1291,7 +1305,7 @@ Ftp::Server::handleRequest(HttpRequest *request)
     Must(header.has(Http::HdrType::FTP_ARGUMENTS));
     String &params = header.findEntry(Http::HdrType::FTP_ARGUMENTS)->value;
 
-    if (do_debug(9, 2)) {
+    if (Debug::Enabled(9, 2)) {
         MemBuf mb;
         mb.init();
         request->pack(&mb);
@@ -1338,7 +1352,7 @@ Ftp::Server::handleRequest(HttpRequest *request)
 
 /// Called to parse USER command, which is required to create an HTTP request
 /// wrapper. W/o request, the errors are handled by returning earlyError().
-ClientSocketContext *
+Http::Stream *
 Ftp::Server::handleUserRequest(const SBuf &, SBuf &params)
 {
     if (params.isEmpty())
@@ -1432,9 +1446,33 @@ Ftp::Server::createDataConnection(Ip::Address cltAddr)
     Comm::ConnectionPointer conn = new Comm::Connection();
     conn->flags |= COMM_DOBIND;
 
-    // Use local IP address of the control connection as the source address
-    // of the active data connection, or some clients will refuse to accept.
-    conn->setAddrs(clientConnection->local, cltAddr);
+    if (clientConnection->flags & COMM_INTERCEPTION) {
+        // In the case of NAT interception conn->local value is not set
+        // because the TCP stack will automatically pick correct source
+        // address for the data connection. We must only ensure that IP
+        // version matches client's address.
+        conn->local.setAnyAddr();
+
+        if (cltAddr.isIPv4())
+            conn->local.setIPv4();
+
+        conn->remote = cltAddr;
+    } else {
+        // In the case of explicit-proxy the local IP of the control connection
+        // is the Squid IP the client is knowingly talking to.
+        //
+        // In the case of TPROXY the IP address of the control connection is
+        // server IP the client is connecting to, it can be spoofed by Squid.
+        //
+        // In both cases some clients may refuse to accept data connections if
+        // these control connectin local-IP's are not used.
+        conn->setAddrs(clientConnection->local, cltAddr);
+
+        // Using non-local addresses in TPROXY mode requires appropriate socket option.
+        if (clientConnection->flags & COMM_TRANSPARENT)
+            conn->flags |= COMM_TRANSPARENT;
+    }
+
     // RFC 959 requires active FTP connections to originate from port 20
     // but that would preclude us from supporting concurrent transfers! (XXX?)
     conn->local.port(0);
@@ -1482,6 +1520,9 @@ Ftp::Server::handleDataRequest(String &, String &)
     if (!checkDataConnPre())
         return false;
 
+    master->userDataDone = 0;
+    originDataDownloadAbortedOnError = false;
+
     changeState(fssHandleDataRequest, "handleDataRequest");
 
     return true;
@@ -1494,10 +1535,12 @@ Ftp::Server::handleUploadRequest(String &, String &)
         return false;
 
     if (Config.accessList.forceRequestBodyContinuation) {
-        ClientHttpRequest *http = getCurrentContext()->http;
+        ClientHttpRequest *http = pipeline.front()->http;
         HttpRequest *request = http->request;
         ACLFilledChecklist bodyContinuationCheck(Config.accessList.forceRequestBodyContinuation, request, NULL);
-        if (bodyContinuationCheck.fastCheck() == ACCESS_ALLOWED) {
+        bodyContinuationCheck.al = http->al;
+        bodyContinuationCheck.syncAle(request, http->log_uri);
+        if (bodyContinuationCheck.fastCheck().allowed()) {
             request->forcedBodyContinuation = true;
             if (checkDataConnPost()) {
                 // Write control Msg
@@ -1598,7 +1641,7 @@ Ftp::Server::handleCdupRequest(String &, String &)
 void
 Ftp::Server::setDataCommand()
 {
-    ClientHttpRequest *const http = getCurrentContext()->http;
+    ClientHttpRequest *const http = pipeline.front()->http;
     assert(http != NULL);
     HttpRequest *const request = http->request;
     assert(request != NULL);
@@ -1664,7 +1707,7 @@ Ftp::Server::connectedForData(const CommConnectCbParams &params)
         if (params.conn != NULL)
             params.conn->close();
         setReply(425, "Cannot open data connection.");
-        ClientSocketContext::Pointer context = getCurrentContext();
+        Http::StreamPointer context = pipeline.front();
         Must(context->http);
         Must(context->http->storeEntry() != NULL);
     } else {
@@ -1679,15 +1722,13 @@ Ftp::Server::connectedForData(const CommConnectCbParams &params)
 void
 Ftp::Server::setReply(const int code, const char *msg)
 {
-    ClientSocketContext::Pointer context = getCurrentContext();
+    Http::StreamPointer context = pipeline.front();
     ClientHttpRequest *const http = context->http;
     assert(http != NULL);
     assert(http->storeEntry() == NULL);
 
     HttpReply *const reply = Ftp::HttpReplyWrapper(code, msg, Http::scNoContent, 0);
 
-    setLogUri(http, urlCanonicalClean(http->request));
-
     clientStreamNode *const node = context->getClientReplyContext();
     clientReplyContext *const repContext =
         dynamic_cast<clientReplyContext *>(node->data.getRaw());
@@ -1711,6 +1752,86 @@ Ftp::Server::callException(const std::exception &e)
     AsyncJob::callException(e);
 }
 
+void
+Ftp::Server::startWaitingForOrigin()
+{
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
+    debugs(33, 5, "waiting for Ftp::Client data transfer to end");
+    waitingForOrigin = true;
+}
+
+void
+Ftp::Server::stopWaitingForOrigin(int originStatus)
+{
+    Must(waitingForOrigin);
+    waitingForOrigin = false;
+
+    if (!isOpen()) // if we are closing, nothing to do
+        return;
+
+    // if we have already decided how to respond, respond now
+    if (delayedReply) {
+        HttpReply::Pointer reply = delayedReply;
+        delayedReply = nullptr;
+        writeForwardedReply(reply.getRaw());
+        return; // do not completeDataDownload() after an earlier response
+    }
+
+    if (master->serverState != fssHandleDataRequest)
+        return;
+
+    // completeDataDownload() could be waitingForOrigin in fssHandleDataRequest
+    // Depending on which side has finished downloading first, either trust
+    // master->userDataDone status or set originDataDownloadAbortedOnError:
+    if (master->userDataDone) {
+        // We finished downloading before Ftp::Client. Most likely, the
+        // adaptation shortened the origin response or we hit an error.
+        // Our status (stored in master->userDataDone) is more informative.
+        // Use master->userDataDone; avoid originDataDownloadAbortedOnError.
+        completeDataDownload();
+    } else {
+        debugs(33, 5, "too early to write the response");
+        // Ftp::Client naturally finished downloading before us. Set
+        // originDataDownloadAbortedOnError to overwrite future
+        // master->userDataDone and relay Ftp::Client error, if there was
+        // any, to the user.
+        originDataDownloadAbortedOnError = (originStatus >= 400);
+    }
+}
+
+void Ftp::Server::userDataCompletionCheckpoint(int finalStatusCode)
+{
+    Must(!master->userDataDone);
+    master->userDataDone = finalStatusCode;
+
+    if (bodyParser)
+        finishDechunkingRequest(false);
+
+    if (waitingForOrigin) {
+        // The completeDataDownload() is not called here unconditionally
+        // because we want to signal the FTP user that we are not fully
+        // done processing its data stream, even though all data bytes
+        // have been sent or received already.
+        debugs(33, 5, "Transferring from FTP server is not complete");
+        return;
+    }
+
+    // Adjust our reply if the server aborted with an error before we are done.
+    if (master->userDataDone == 226 && originDataDownloadAbortedOnError) {
+        debugs(33, 5, "Transferring from FTP server terminated with an error, adjust status code");
+        master->userDataDone = 451;
+    }
+    completeDataDownload();
+}
+
+void Ftp::Server::completeDataDownload()
+{
+    writeCustomReply(master->userDataDone, master->userDataDone == 226 ? "Transfer complete" : "Server error; transfer aborted");
+    closeDataConnection();
+}
+
 /// Whether Squid FTP Relay supports a named feature (e.g., a command).
 static bool
 Ftp::SupportedCommand(const SBuf &name)