]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/client_side.cc
Merged from trunk (r13515).
[thirdparty/squid.git] / src / client_side.cc
index 65344d0a72a9ff27f3c3b53ad07b7fa256482c4a..5ceebc57d9b4835310dfb33e0b05b41af0bf81e2 100644 (file)
@@ -93,6 +93,7 @@
 #include "clientStream.h"
 #include "comm.h"
 #include "comm/Connection.h"
+#include "comm/ConnOpener.h"
 #include "comm/Loops.h"
 #include "comm/Read.h"
 #include "comm/TcpAcceptor.h"
 #include "errorpage.h"
 #include "fd.h"
 #include "fde.h"
+#include "FtpServer.h"
 #include "fqdncache.h"
 #include "FwdState.h"
 #include "globals.h"
 #include "http.h"
+#include "HttpHdrCc.h"
 #include "HttpHdrContRange.h"
 #include "HttpHeaderTools.h"
 #include "HttpReply.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "internal.h"
+#include "ip/tools.h"
 #include "ipc/FdNotes.h"
 #include "ipc/StartListening.h"
 #include "log/access_log.h"
 #include <climits>
 #include <cmath>
 #include <limits>
+#include <set>
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
@@ -195,6 +200,7 @@ static IOACB httpAccept;
 #if USE_OPENSSL
 static IOACB httpsAccept;
 #endif
+static IOACB ftpAccept;
 static CTCB clientLifetimeTimeout;
 static ClientSocketContext *parseHttpRequestAbort(ConnStateData * conn, const char *uri);
 static ClientSocketContext *parseHttpRequest(ConnStateData *, HttpParser *, HttpRequestMethod *, Http::ProtocolVersion *);
@@ -222,6 +228,56 @@ static void clientUpdateSocketStats(LogTags logType, size_t size);
 char *skipLeadingSpace(char *aString);
 static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount);
 
+static void FtpChangeState(ConnStateData *connState, const ConnStateData::FtpState newState, const char *reason);
+static IOACB FtpAcceptDataConnection;
+static void FtpCloseDataConnection(ConnStateData *conn);
+static ClientSocketContext *FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::ProtocolVersion *http_ver);
+static bool FtpHandleUserRequest(ConnStateData *connState, const String &cmd, String &params);
+static CNCB FtpHandleConnectDone;
+
+static void FtpHandleReply(ClientSocketContext *context, HttpReply *reply, StoreIOBuffer data);
+typedef void FtpReplyHandler(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data);
+static FtpReplyHandler FtpHandleFeatReply;
+static FtpReplyHandler FtpHandlePasvReply;
+static FtpReplyHandler FtpHandlePortReply;
+static FtpReplyHandler FtpHandleErrorReply;
+static FtpReplyHandler FtpHandleDataReply;
+static FtpReplyHandler FtpHandleUploadReply;
+static FtpReplyHandler FtpHandleEprtReply;
+static FtpReplyHandler FtpHandleEpsvReply;
+
+static void FtpWriteEarlyReply(ConnStateData *conn, const int code, const char *msg);
+static void FtpWriteReply(ClientSocketContext *context, MemBuf &mb);
+static void FtpWriteCustomReply(ClientSocketContext *context, const int code, const char *msg, const HttpReply *reply = NULL);
+static void FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply);
+static void FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply, AsyncCall::Pointer call);
+static void FtpWriteErrorReply(ClientSocketContext *context, const HttpReply *reply, const int status);
+
+static void FtpPrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix = "");
+static IOCB FtpWroteEarlyReply;
+static IOCB FtpWroteReply;
+static IOCB FtpWroteReplyData;
+
+typedef bool FtpRequestHandler(ClientSocketContext *context, String &cmd, String &params);
+static FtpRequestHandler FtpHandleRequest;
+static FtpRequestHandler FtpHandleFeatRequest;
+static FtpRequestHandler FtpHandlePasvRequest;
+static FtpRequestHandler FtpHandlePortRequest;
+static FtpRequestHandler FtpHandleDataRequest;
+static FtpRequestHandler FtpHandleUploadRequest;
+static FtpRequestHandler FtpHandleEprtRequest;
+static FtpRequestHandler FtpHandleEpsvRequest;
+static FtpRequestHandler FtpHandleCwdRequest;
+static FtpRequestHandler FtpHandlePassRequest;
+static FtpRequestHandler FtpHandleCdupRequest;
+
+static bool FtpCheckDataConnPre(ClientSocketContext *context);
+static bool FtpCheckDataConnPost(ClientSocketContext *context);
+static void FtpSetDataCommand(ClientSocketContext *context);
+static void FtpSetReply(ClientSocketContext *context, const int code, const char *msg);
+static bool FtpSupportedCommand(const String &name);
+
+
 clientStreamNode *
 ClientSocketContext::getTail() const
 {
@@ -237,6 +293,12 @@ ClientSocketContext::getClientReplyContext() const
     return (clientStreamNode *)http->client_stream.tail->prev->data;
 }
 
+ConnStateData *
+ClientSocketContext::getConn() const
+{
+    return http->getConn();
+}
+
 /**
  * This routine should be called to grow the in.buf and then
  * call Comm::Read().
@@ -257,6 +319,25 @@ ConnStateData::readSomeData()
     Comm::Read(clientConnection, reader);
 }
 
+void
+ConnStateData::readSomeFtpData()
+{
+    if (ftp.reader != NULL)
+        return;
+
+    const size_t availSpace = sizeof(ftp.uploadBuf) - ftp.uploadAvailSize;
+    if (availSpace <= 0)
+        return;
+
+    debugs(33, 4, HERE << ftp.dataConn << ": reading FTP data...");
+
+    typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
+    ftp.reader = JobCallback(33, 5, Dialer, this,
+                             ConnStateData::clientReadFtpData);
+    comm_read(ftp.dataConn, ftp.uploadBuf + ftp.uploadAvailSize, availSpace,
+              ftp.reader);
+}
+
 void
 ClientSocketContext::removeFromConnectionList(ConnStateData * conn)
 {
@@ -352,22 +433,28 @@ ClientSocketContext::writeControlMsg(HttpControlMsg &msg)
     const HttpReply::Pointer rep(msg.reply);
     Must(rep != NULL);
 
+    // remember the callback
+    cbControlMsgSent = msg.cbSuccess;
+
+    AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg",
+                                         CommIoCbPtrFun(&WroteControlMsg, this));
+
+    if (getConn()->isFtp) {
+        FtpWriteForwardedReply(this, rep.getRaw(), call);
+        return;
+    }
+
     // apply selected clientReplyContext::buildReplyHeader() mods
     // it is not clear what headers are required for control messages
     rep->header.removeHopByHopEntries();
     rep->header.putStr(HDR_CONNECTION, "keep-alive");
     httpHdrMangleList(&rep->header, http->request, ROR_REPLY);
 
-    // remember the callback
-    cbControlMsgSent = msg.cbSuccess;
-
     MemBuf *mb = rep->pack();
 
     debugs(11, 2, "HTTP Client " << clientConnection);
     debugs(11, 2, "HTTP Client CONTROL MSG:\n---------\n" << mb->buf << "\n----------");
 
-    AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg",
-                                         CommIoCbPtrFun(&WroteControlMsg, this));
     Comm::Write(clientConnection, mb, call);
 
     delete mb;
@@ -616,7 +703,7 @@ ClientHttpRequest::logRequest()
 
     debugs(33, 9, "clientLogRequest: http.code='" << al->http.code << "'");
 
-    if (loggingEntry() && loggingEntry()->mem_obj)
+    if (loggingEntry() && loggingEntry()->mem_obj && loggingEntry()->objectLen() >= 0)
         al->cache.objectSize = loggingEntry()->contentLen(); // payload duplicate ?? with or without TE ?
 
     al->http.clientRequestSz.header = req_sz;
@@ -845,7 +932,7 @@ ConnStateData::swanSong()
     assert(areAllContextsForThisConnection());
     freeAllContexts();
 
-    unpinConnection();
+    unpinConnection(true);
 
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
@@ -872,6 +959,8 @@ ConnStateData::~ConnStateData()
     assert(this != NULL);
     debugs(33, 3, HERE << clientConnection);
 
+    FtpCloseDataConnection(this);
+
     if (isOpen())
         debugs(33, DBG_IMPORTANT, "BUG: ConnStateData did not close " << clientConnection);
 
@@ -1474,6 +1563,7 @@ static void
 clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,
                       HttpReply * rep, StoreIOBuffer receivedData)
 {
+    debugs(33,7, HERE << "rep->content_length=" << (rep ? rep->content_length : -2) << " receivedData.length=" << receivedData.length);
     /* Test preconditions */
     assert(node != NULL);
     PROF_start(clientSocketRecipient);
@@ -1496,6 +1586,13 @@ clientSocketRecipient(clientStreamNode * node, ClientHttpRequest * http,
         return;
     }
 
+    if (http->getConn()->isFtp) {
+        assert(context->http == http);
+        FtpHandleReply(context.getRaw(), rep, receivedData);
+        PROF_stop(clientSocketRecipient);
+        return;
+    }
+
     // After sending Transfer-Encoding: chunked (at least), always send
     // the last-chunk if there was no error, ignoring responseFinishedOrFailed.
     const bool mustSendLastChunk = http->request->flags.chunkedReply &&
@@ -1563,7 +1660,9 @@ ConnStateData::readNextRequest()
     typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
     AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
                                      TimeoutDialer, this, ConnStateData::requestTimeout);
-    commSetConnTimeout(clientConnection, Config.Timeout.clientIdlePconn, timeoutCall);
+    const int timeout = isFtp ? Config.Timeout.ftpClientIdle :
+                        Config.Timeout.clientIdlePconn;
+    commSetConnTimeout(clientConnection, timeout, timeoutCall);
 
     readSomeData();
     /** Please don't do anything with the FD past here! */
@@ -2594,7 +2693,8 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
 
     /* We have an initial client stream in place should it be needed */
     /* setup our private context */
-    context->registerWithConn();
+    if (!conn->isFtp)
+        context->registerWithConn();
 
     if (context->flags.parsed_ok == 0) {
         clientStreamNode *node = context->getClientReplyContext();
@@ -2621,6 +2721,11 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
         goto finish;
     }
 
+    if (conn->isFtp) {
+        assert(http->request);
+        request = http->request;
+        notedUseOfBuffer = true;
+    } else
     if ((request = HttpRequest::CreateFromUrlAndMethod(http->uri, method)) == NULL) {
         clientStreamNode *node = context->getClientReplyContext();
         debugs(33, 5, "Invalid URL: " << http->uri);
@@ -2657,7 +2762,8 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
     /* compile headers */
     /* we should skip request line! */
     /* XXX should actually know the damned buffer size here */
-    if (http_ver.major >= 1 && !request->parseHeader(HttpParserHdrBuf(hp), HttpParserHdrSz(hp))) {
+    if (!conn->isFtp && http_ver.major >= 1 &&
+        !request->parseHeader(HttpParserHdrBuf(hp), HttpParserHdrSz(hp))) {
         clientStreamNode *node = context->getClientReplyContext();
         debugs(33, 5, "Failed to parse request headers:\n" << HttpParserHdrBuf(hp));
         conn->quitAfterError(request.getRaw());
@@ -2790,8 +2896,10 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
         }
     }
 
-    http->request = request.getRaw();
-    HTTPMSGLOCK(http->request);
+    if (!conn->isFtp) {
+        http->request = request.getRaw();
+        HTTPMSGLOCK(http->request);
+    }
     clientSetKeepaliveFlag(http);
 
     // Let tunneling code be fully responsible for CONNECT requests
@@ -2815,9 +2923,11 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
         request->body_pipe = conn->expectRequestBody(
                                  chunked ? -1 : request->content_length);
 
-        // consume header early so that body pipe gets just the body
-        connNoteUseOfBuffer(conn, http->req_sz);
-        notedUseOfBuffer = true;
+        if (!notedUseOfBuffer) {
+            // consume header early so that body pipe gets just the body
+            connNoteUseOfBuffer(conn, http->req_sz);
+            notedUseOfBuffer = true;
+        }
 
         /* Is it too large? */
         if (!chunked && // if chunked, we will check as we accumulate
@@ -2834,15 +2944,17 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
             goto finish;
         }
 
-        // We may stop producing, comm_close, and/or call setReplyToError()
-        // below, so quit on errors to avoid http->doCallouts()
-        if (!conn->handleRequestBodyData())
-            goto finish;
+        if (!conn->isFtp) {
+            // We may stop producing, comm_close, and/or call setReplyToError()
+            // below, so quit on errors to avoid http->doCallouts()
+            if (!conn->handleRequestBodyData())
+                goto finish;
 
-        if (!request->body_pipe->productionEnded()) {
-            debugs(33, 5, HERE << "need more request body");
-            context->mayUseConnection(true);
-            assert(conn->flags.readMore);
+            if (!request->body_pipe->productionEnded()) {
+                debugs(33, 5, HERE << "need more request body");
+                context->mayUseConnection(true);
+                assert(conn->flags.readMore);
+            }
         }
     }
 
@@ -2868,6 +2980,46 @@ finish:
     }
 }
 
+void
+ConnStateData::processFtpRequest(ClientSocketContext *const context)
+{
+    ClientHttpRequest *const http = context->http;
+    assert(http != NULL);
+    HttpRequest *const request = http->request;
+    assert(request != NULL);
+    debugs(33, 9, request);
+
+    HttpHeader &header = request->header;
+    assert(header.has(HDR_FTP_COMMAND));
+    String &cmd = header.findEntry(HDR_FTP_COMMAND)->value;
+    assert(header.has(HDR_FTP_ARGUMENTS));
+    String &params = header.findEntry(HDR_FTP_ARGUMENTS)->value;
+
+    const bool fwd = !http->storeEntry() &&
+                     FtpHandleRequest(context, cmd, params);
+
+    if (http->storeEntry() != NULL) {
+        debugs(33, 4, "got an immediate response");
+        assert(http->storeEntry() != NULL);
+        clientSetKeepaliveFlag(http);
+        context->pullData();
+    } else if (fwd) {
+        debugs(33, 4, "forwarding request to server side");
+        assert(http->storeEntry() == NULL);
+        clientProcessRequest(this, &parser_, context, request->method,
+                             request->http_ver);
+    } else {
+        debugs(33, 4, "will resume processing later");
+    }
+}
+
+void
+ConnStateData::resumeFtpRequest(ClientSocketContext *const context)
+{
+    debugs(33, 4, "resuming");
+    processFtpRequest(context);
+}
+
 static void
 connStripBufferWhitespace (ConnStateData * conn)
 {
@@ -2889,7 +3041,7 @@ ConnStateData::concurrentRequestQueueFilled() const
 
     // default to the configured pipeline size.
     // add 1 because the head of pipeline is counted in concurrent requests and not prefetch queue
-    const int concurrentRequestLimit = Config.pipeline_max_prefetch + 1;
+    const int concurrentRequestLimit = (isFtp ? 0 : Config.pipeline_max_prefetch) + 1;
 
     // when queue filled already we cant add more.
     if (existingRequestCount >= concurrentRequestLimit) {
@@ -2927,14 +3079,18 @@ ConnStateData::clientParseRequests()
         if (concurrentRequestQueueFilled())
             break;
 
-        /* Begin the parsing */
-        PROF_start(parseHttpRequest);
-        HttpParserInit(&parser_, in.buf.c_str(), in.buf.length());
-
-        /* Process request */
+        ClientSocketContext *context = NULL;
         Http::ProtocolVersion http_ver;
-        ClientSocketContext *context = parseHttpRequest(this, &parser_, &method, &http_ver);
-        PROF_stop(parseHttpRequest);
+        if (!isFtp) {
+            /* Begin the parsing */
+            PROF_start(parseHttpRequest);
+            HttpParserInit(&parser_, in.buf.c_str(), in.buf.length());
+
+            /* Process request */
+            context = parseHttpRequest(this, &parser_, &method, &http_ver);
+            PROF_stop(parseHttpRequest);
+        } else
+            context = FtpParseRequest(this, &method, &http_ver);
 
         /* partial or incomplete request */
         if (!context) {
@@ -2951,7 +3107,14 @@ ConnStateData::clientParseRequests()
                                              CommTimeoutCbPtrFun(clientLifetimeTimeout, context->http));
             commSetConnTimeout(clientConnection, Config.Timeout.lifetime, timeoutCall);
 
-            clientProcessRequest(this, &parser_, context, method, http_ver);
+            if (!isFtp)
+                clientProcessRequest(this, &parser_, context, method, http_ver);
+            else {
+                // Process FTP request asynchronously to make sure FTP
+                // data connection accept callback is fired first.
+                CallJobHere1(33, 4, CbcPointer<ConnStateData>(this),
+                    ConnStateData, ConnStateData::processFtpRequest, context);
+            }
 
             parsed_req = true; // XXX: do we really need to parse everything right NOW ?
 
@@ -3064,6 +3227,59 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io)
     clientAfterReadingRequests();
 }
 
+void
+ConnStateData::clientReadFtpData(const CommIoCbParams &io)
+{
+    debugs(33,5,HERE << io.conn << " size " << io.size);
+    Must(ftp.reader != NULL);
+    ftp.reader = NULL;
+
+    assert(Comm::IsConnOpen(ftp.dataConn));
+    assert(io.conn->fd == ftp.dataConn->fd);
+
+    if (io.flag == Comm::OK && bodyPipe != NULL) {
+        if (io.size > 0) {
+            kb_incr(&(statCounter.client_http.kbytes_in), io.size);
+
+            char *const current_buf = ftp.uploadBuf + ftp.uploadAvailSize;
+            if (io.buf != current_buf)
+                memmove(current_buf, io.buf, io.size);
+            ftp.uploadAvailSize += io.size;
+            handleFtpRequestData();
+        } else if (io.size == 0) {
+            debugs(33, 5, HERE << io.conn << " closed");
+            FtpCloseDataConnection(this);
+            if (ftp.uploadAvailSize <= 0)
+                finishDechunkingRequest(true);
+        }
+    } else { // not Comm::Flags::OK or unexpected read
+        debugs(33, 5, HERE << io.conn << " closed");
+        FtpCloseDataConnection(this);
+        finishDechunkingRequest(false);
+    }
+
+}
+
+void
+ConnStateData::handleFtpRequestData()
+{
+    assert(bodyPipe != NULL);
+
+    debugs(33,5, HERE << "handling FTP request data for " << clientConnection);
+    const size_t putSize = bodyPipe->putMoreData(ftp.uploadBuf,
+                                                 ftp.uploadAvailSize);
+    if (putSize > 0) {
+        ftp.uploadAvailSize -= putSize;
+        if (ftp.uploadAvailSize > 0)
+            memmove(ftp.uploadBuf, ftp.uploadBuf + putSize, ftp.uploadAvailSize);
+    }
+
+    if (Comm::IsConnOpen(ftp.dataConn))
+        readSomeFtpData();
+    else if (ftp.uploadAvailSize <= 0)
+        finishDechunkingRequest(true);
+}
+
 /**
  * called when new request data has been read from the socket
  *
@@ -3210,6 +3426,11 @@ ConnStateData::abortChunkedRequestBody(const err_type error)
 void
 ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer )
 {
+    if (isFtp) {
+        handleFtpRequestData();
+        return;
+    }
+
     if (!handleRequestBodyData())
         return;
 
@@ -3227,6 +3448,11 @@ ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer )
     if (bodyPipe != NULL)
         bodyPipe->enableAutoConsumption();
 
+    if (isFtp) {
+        FtpCloseDataConnection(this);
+        return;
+    }
+
     stopReceiving("virgin request body consumer aborted"); // closes ASAP
 }
 
@@ -3257,8 +3483,9 @@ clientLifetimeTimeout(const CommTimeoutCbParams &io)
         io.conn->close();
 }
 
-ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) :
+ConnStateData::ConnStateData(const MasterXaction::Pointer &xact):
         AsyncJob("ConnStateData"),
+        isFtp(xact->squidPort->transport.protocol == AnyP::PROTO_FTP), // TODO: convert into a method?
 #if USE_OPENSSL
         sslBumpMode(Ssl::bumpEnd),
         switchedToHttps_(false),
@@ -3318,7 +3545,14 @@ ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) :
 
     clientdbEstablished(clientConnection->remote, 1);
 
-    flags.readMore = true;
+    flags.readMore = !isFtp;
+
+    if (isFtp) {
+        ftp.gotEpsvAll = false;
+        ftp.readGreeting = false;
+        ftp.state = FTP_BEGIN;
+        ftp.uploadAvailSize = 0;
+    }
 }
 
 /** Handle a new connection on HTTP socket. */
@@ -3690,6 +3924,46 @@ httpsAccept(const CommAcceptCbParams &params)
     }
 }
 
+/** handle a new FTP connection */
+static void
+ftpAccept(const CommAcceptCbParams &params)
+{
+    MasterXaction::Pointer xact = params.xaction;
+    AnyP::PortCfgPointer s = xact->squidPort;
+
+    // NP: it is possible the port was reconfigured when the call or accept() was queued.
+
+    if (params.flag != Comm::OK) {
+        // Its possible the call was still queued when the client disconnected
+        debugs(33, 2, "ftpAccept: " << s->listenConn << ": accept failure: " << xstrerr(params.xerrno));
+        return;
+    }
+
+    debugs(33, 4, HERE << params.conn << ": accepted");
+    fd_note(params.conn->fd, "client ftp connect");
+
+    if (s->tcp_keepalive.enabled) {
+        commSetTcpKeepalive(params.conn->fd, s->tcp_keepalive.idle, s->tcp_keepalive.interval, s->tcp_keepalive.timeout);
+    }
+
+    ++incoming_sockets_accepted;
+
+    // Socket is ready, setup the connection manager to start using it
+    ConnStateData *connState = new ConnStateData(xact);
+
+    if (connState->transparent()) {
+        char buf[MAX_IPSTRLEN];
+        connState->clientConnection->local.toUrl(buf, MAX_IPSTRLEN);
+        connState->ftp.host = buf;
+        const char *uri = connState->ftpBuildUri();
+        debugs(33, 5, HERE << "FTP transparent URL: " << uri);
+    }
+
+    FtpWriteEarlyReply(connState, 220, "Service ready");
+
+    // TODO: Merge common httpAccept() parts, applying USE_DELAY_POOLS to FTP.
+}
+
 void
 ConnStateData::sslCrtdHandleReplyWrapper(void *data, const HelperReply &reply)
 {
@@ -4127,6 +4401,36 @@ clientHttpsConnectionsOpen(void)
 }
 #endif
 
+static void
+clientFtpConnectionsOpen(void)
+{
+    for (AnyP::PortCfgPointer s = FtpPortList; s != NULL; s = s->next) {
+        if (MAXTCPLISTENPORTS == NHttpSockets) {
+            debugs(1, DBG_IMPORTANT, "Ignoring 'ftp_port' lines exceeding the limit.");
+            debugs(1, DBG_IMPORTANT, "The limit is " << MAXTCPLISTENPORTS << " FTP ports.");
+            continue;
+        }
+
+        // Fill out a Comm::Connection which IPC will open as a listener for us
+        s->listenConn = new Comm::Connection;
+        s->listenConn->local = s->s;
+        s->listenConn->flags = COMM_NONBLOCKING | (s->flags.tproxyIntercept ? COMM_TRANSPARENT : 0) |
+                               (s->flags.natIntercept ? COMM_INTERCEPTION : 0);
+
+        // setup the subscriptions such that new connections accepted by listenConn are handled by FTP
+        typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+        RefCount<AcceptCall> subCall = commCbCall(5, 5, "ftpAccept", CommAcceptCbPtrFun(ftpAccept, CommAcceptCbParams(NULL)));
+        Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+
+        AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+                                        ListeningStartedDialer(&clientListenerConnectionOpened,
+                                                               s, Ipc::fdnFtpSocket, sub));
+        Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnFtpSocket, listenCall);
+        HttpSockets[NHttpSockets] = -1;
+        ++NHttpSockets;
+    }
+}
+
 /// process clientHttpConnectionsOpen result
 static void
 clientListenerConnectionOpened(AnyP::PortCfgPointer &s, const Ipc::FdNoteId portTypeNote, const Subscription::Pointer &sub)
@@ -4159,13 +4463,14 @@ clientOpenListenSockets(void)
 #if USE_OPENSSL
     clientHttpsConnectionsOpen();
 #endif
+    clientFtpConnectionsOpen();
 
     if (NHttpSockets < 1)
-        fatal("No HTTP or HTTPS ports configured");
+        fatal("No HTTP, HTTPS or FTP ports configured");
 }
 
 void
-clientHttpConnectionsClose(void)
+clientConnectionsClose(void)
 {
     for (AnyP::PortCfgPointer s = HttpPortList; s != NULL; s = s->next) {
         if (s->listenConn != NULL) {
@@ -4185,6 +4490,14 @@ clientHttpConnectionsClose(void)
     }
 #endif
 
+    for (AnyP::PortCfgPointer s = HttpPortList; s != NULL; s = s->next) {
+        if (s->listenConn != NULL) {
+            debugs(1, DBG_IMPORTANT, "Closing FTP port " << s->listenConn->local);
+            s->listenConn->close();
+            s->listenConn = NULL;
+        }
+    }
+
     // TODO see if we can drop HttpSockets array entirely */
     for (int i = 0; i < NHttpSockets; ++i) {
         HttpSockets[i] = -1;
@@ -4422,31 +4735,47 @@ ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io)
     assert(pinning.serverConnection == io.conn);
     pinning.closeHandler = NULL; // Comm unregisters handlers before calling
     const bool sawZeroReply = pinning.zeroReply; // reset when unpinning
-    unpinConnection();
+    unpinConnection(false);
+
+    if (isFtp) {
+        // if the server control connection is gone, reset state to login again
+        // TODO: merge with similar code in FtpHandleUserRequest()
+        debugs(33, 5, "will need to re-login due to FTP server closure");
+        ftp.readGreeting = false;
+        FtpChangeState(this, ConnStateData::FTP_BEGIN, "server closure");
+        // XXX: Not enough. Gateway::ServerStateData::sendCommand() will not
+        // re-login because clientState() is not ConnStateData::FTP_CONNECTED.
+    }
+
     if (sawZeroReply && clientConnection != NULL) {
         debugs(33, 3, "Closing client connection on pinned zero reply.");
         clientConnection->close();
     }
+
 }
 
 void
-ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth)
+ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth, bool monitor)
 {
-    char desc[FD_DESC_SZ];
+    if (!Comm::IsConnOpen(pinning.serverConnection) || 
+        pinning.serverConnection->fd != pinServer->fd)
+        pinNewConnection(pinServer, request, aPeer, auth);
 
-    if (Comm::IsConnOpen(pinning.serverConnection)) {
-        if (pinning.serverConnection->fd == pinServer->fd) {
-            startPinnedConnectionMonitoring();
-            return;
-        }
-    }
+    if (monitor)
+        startPinnedConnectionMonitoring();
+}
 
-    unpinConnection(); // closes pinned connection, if any, and resets fields
+void
+ConnStateData::pinNewConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth)
+{
+    unpinConnection(true); // closes pinned connection, if any, and resets fields
 
     pinning.serverConnection = pinServer;
 
     debugs(33, 3, HERE << pinning.serverConnection);
 
+    Must(pinning.serverConnection != NULL);
+
     // when pinning an SSL bumped connection, the request may be NULL
     const char *pinnedHost = "[unknown]";
     if (request) {
@@ -4461,6 +4790,7 @@ ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpReque
         pinning.peer = cbdataReference(aPeer);
     pinning.auth = auth;
     char stmp[MAX_IPSTRLEN];
+    char desc[FD_DESC_SZ];
     snprintf(desc, FD_DESC_SZ, "%s pinned connection for %s (%d)",
              (auth || !aPeer) ? pinnedHost : aPeer->name,
              clientConnection->remote.toUrl(stmp,MAX_IPSTRLEN),
@@ -4475,11 +4805,10 @@ ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpReque
     Params &params = GetCommParams<Params>(pinning.closeHandler);
     params.conn = pinning.serverConnection;
     comm_add_close_handler(pinning.serverConnection->fd, pinning.closeHandler);
-
-    startPinnedConnectionMonitoring();
 }
 
-/// Assign a read handler to an idle pinned connection so that we can detect connection closures.
+/// [re]start monitoring pinned connection for server closures so that we can
+/// propagate them to an _idle_ client pinned to the server
 void
 ConnStateData::startPinnedConnectionMonitoring()
 {
@@ -4546,14 +4875,24 @@ ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *a
 
     if (!valid) {
         /* The pinning info is not safe, remove any pinning info */
-        unpinConnection();
+        unpinConnection(true);
     }
 
     return pinning.serverConnection;
 }
 
+Comm::ConnectionPointer
+ConnStateData::borrowPinnedConnection(HttpRequest *request, const CachePeer *aPeer)
+{
+    debugs(33, 7, pinning.serverConnection);
+    if (validatePinnedConnection(request, aPeer) != NULL)
+        stopPinnedConnectionMonitoring();
+
+    return pinning.serverConnection; // closed if validation failed
+}
+
 void
-ConnStateData::unpinConnection()
+ConnStateData::unpinConnection(const bool andClose)
 {
     debugs(33, 3, HERE << pinning.serverConnection);
 
@@ -4565,9 +4904,13 @@ ConnStateData::unpinConnection()
             comm_remove_close_handler(pinning.serverConnection->fd, pinning.closeHandler);
             pinning.closeHandler = NULL;
         }
-        /// also close the server side socket, we should not use it for any future requests...
-        // TODO: do not close if called from our close handler?
-        pinning.serverConnection->close();
+
+        stopPinnedConnectionMonitoring();
+
+        // close the server side socket if requested
+        if (andClose)
+            pinning.serverConnection->close();
+        pinning.serverConnection = NULL;
     }
 
     safe_free(pinning.host);
@@ -4577,3 +4920,1314 @@ ConnStateData::unpinConnection()
     /* NOTE: pinning.pinned should be kept. This combined with fd == -1 at the end of a request indicates that the host
      * connection has gone away */
 }
+
+const char *
+ConnStateData::ftpBuildUri(const char *file)
+{
+    ftp.uri = "ftp://";
+    ftp.uri.append(ftp.host);
+    if (port->ftp_track_dirs && ftp.workingDir.size()) {
+        if (ftp.workingDir[0] != '/')
+            ftp.uri.append("/");
+        ftp.uri.append(ftp.workingDir);
+    }
+
+    if (ftp.uri[ftp.uri.size() - 1] != '/')
+        ftp.uri.append("/");
+
+    if (port->ftp_track_dirs && file) {
+        //remove any '/' from the beginning of path
+        while (*file == '/')
+            ++file;
+        ftp.uri.append(file);
+    }
+
+    return ftp.uri.termedBuf();
+}
+
+void
+ConnStateData::ftpSetWorkingDir(const char *dir)
+{
+    ftp.workingDir = dir;
+}
+
+static void
+FtpAcceptDataConnection(const CommAcceptCbParams &params)
+{
+    ConnStateData *connState = static_cast<ConnStateData *>(params.data);
+
+    if (params.flag != Comm::OK) {
+        // Its possible the call was still queued when the client disconnected
+        debugs(33, 2, HERE << connState->ftp.dataListenConn << ": accept "
+               "failure: " << xstrerr(params.xerrno));
+        return;
+    }
+
+    debugs(33, 4, "accepted " << params.conn);
+    fd_note(params.conn->fd, "passive client ftp data");
+    ++incoming_sockets_accepted;
+
+    if (!connState->clientConnection) {
+        debugs(33, 5, "late data connection?");
+        FtpCloseDataConnection(connState); // in case we are still listening
+        params.conn->close();
+    } else
+    if (params.conn->remote != connState->clientConnection->remote) {
+        debugs(33, 2, "rogue data conn? ctrl: " << connState->clientConnection->remote);
+        params.conn->close();
+        // Some FTP servers close control connection here, but it may make
+        // things worse from DoS p.o.v. and no better from data stealing p.o.v.
+    } else {
+        FtpCloseDataConnection(connState);
+        connState->ftp.dataConn = params.conn;
+        connState->ftp.uploadAvailSize = 0;
+        debugs(33, 7, "ready for data");
+        if (connState->ftp.onDataAcceptCall != NULL) {
+            AsyncCall::Pointer call = connState->ftp.onDataAcceptCall;
+            connState->ftp.onDataAcceptCall = NULL;
+            // If we got an upload request, start reading data from the client.
+            if (connState->ftp.state == ConnStateData::FTP_HANDLE_UPLOAD_REQUEST)
+                connState->readSomeFtpData();
+            else
+                Must(connState->ftp.state == ConnStateData::FTP_HANDLE_DATA_REQUEST);
+            MemBuf mb;
+            mb.init();
+            mb.Printf("150 Data connection opened.\r\n");
+            Comm::Write(connState->clientConnection, &mb, call);
+        }
+    }
+}
+
+static void
+FtpCloseDataConnection(ConnStateData *conn)
+{
+    if (conn->ftp.listener != NULL) {
+        conn->ftp.listener->cancel("no longer needed");
+        conn->ftp.listener = NULL;
+    }
+
+    if (Comm::IsConnOpen(conn->ftp.dataListenConn)) {
+        debugs(33, 5, HERE << "FTP closing client data listen socket: " <<
+               *conn->ftp.dataListenConn);
+        conn->ftp.dataListenConn->close();
+    }
+    conn->ftp.dataListenConn = NULL;
+
+    if (conn->ftp.reader != NULL) {
+        // Comm::ReadCancel can deal with negative FDs
+        Comm::ReadCancel(conn->ftp.dataConn->fd, conn->ftp.reader);
+        conn->ftp.reader = NULL;
+    }
+
+    if (Comm::IsConnOpen(conn->ftp.dataConn)) {
+        debugs(33, 5, HERE << "FTP closing client data connection: " <<
+               *conn->ftp.dataConn);
+        conn->ftp.dataConn->close();
+    }
+    conn->ftp.dataConn = NULL;
+}
+
+/// Writes FTP [error] response before we fully parsed the FTP request and
+/// created the corresponding HTTP request wrapper for that FTP request.
+static void
+FtpWriteEarlyReply(ConnStateData *connState, const int code, const char *msg)
+{
+    debugs(33, 7, HERE << code << ' ' << msg);
+    assert(99 < code && code < 1000);
+
+    MemBuf mb;
+    mb.init();
+    mb.Printf("%i %s\r\n", code, msg);
+
+    AsyncCall::Pointer call = commCbCall(33, 5, "FtpWroteEarlyReply",
+        CommIoCbPtrFun(&FtpWroteEarlyReply, connState));
+    Comm::Write(connState->clientConnection, &mb, call);
+
+    connState->flags.readMore = false;
+
+    // TODO: Create master transaction. Log it in FtpWroteEarlyReply.
+}
+
+static void
+FtpWriteReply(ClientSocketContext *context, MemBuf &mb)
+{
+    debugs(11, 2, "FTP Client " << context->clientConnection);
+    debugs(11, 2, "FTP Client REPLY:\n---------\n" << mb.buf <<
+           "\n----------");
+
+    AsyncCall::Pointer call = commCbCall(33, 5, "FtpWroteReply",
+        CommIoCbPtrFun(&FtpWroteReply, context));
+    Comm::Write(context->clientConnection, &mb, call);
+}
+
+static void
+FtpWriteCustomReply(ClientSocketContext *context, const int code, const char *msg, const HttpReply *reply)
+{
+    debugs(33, 7, HERE << code << ' ' << msg);
+    assert(99 < code && code < 1000);
+
+    const bool sendDetails = reply != NULL &&
+        reply->header.has(HDR_FTP_STATUS) && reply->header.has(HDR_FTP_REASON);
+
+    MemBuf mb;
+    mb.init();
+    if (sendDetails) {
+        mb.Printf("%i-%s\r\n", code, msg);
+        mb.Printf(" Server reply:\r\n");
+        FtpPrintReply(mb, reply, " ");
+        mb.Printf("%i \r\n", code);
+    } else
+        mb.Printf("%i %s\r\n", code, msg);
+
+    FtpWriteReply(context, mb);
+}
+
+static void 
+FtpChangeState(ConnStateData *connState, const ConnStateData::FtpState newState, const char *reason)
+{
+    assert(connState);
+    if (connState->ftp.state == newState) {
+        debugs(33, 3, "client state unchanged at " << connState->ftp.state <<
+               " because " << reason);
+        connState->ftp.state = newState;
+    } else {
+        debugs(33, 3, "client state was " << connState->ftp.state <<
+               ", now " << newState << " because " << reason);
+        connState->ftp.state = newState;
+    }
+}
+
+/** Parse an FTP request
+ *
+ *  \note Sets result->flags.parsed_ok to 0 if failed to parse the request,
+ *          to 1 if the request was correctly parsed.
+ *  \param[in] connState a ConnStateData. The caller must make sure it is not null
+ *  \param[out] mehtod_p will be set as a side-effect of the parsing.
+ *          Pointed-to value will be set to Http::METHOD_NONE in case of
+ *          parsing failure
+ *  \param[out] http_ver will be set as a side-effect of the parsing
+ *  \return NULL on incomplete requests,
+ *          a ClientSocketContext structure on success or failure.
+ */
+static ClientSocketContext *
+FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::ProtocolVersion *http_ver)
+{
+    *http_ver = Http::ProtocolVersion(1, 1);
+
+    // TODO: Use tokenizer for parsing instead of raw pointer manipulation.
+    const char *inBuf = connState->in.buf.rawContent();
+
+    const char *const eor =
+        static_cast<const char *>(memchr(inBuf, '\n',
+            min(static_cast<size_t>(connState->in.buf.length()), Config.maxRequestHeaderSize)));
+
+    if (eor == NULL && connState->in.buf.length() >= Config.maxRequestHeaderSize) {
+        FtpChangeState(connState, ConnStateData::FTP_ERROR, "huge req");
+        FtpWriteEarlyReply(connState, 421, "Too large request");
+        return NULL;
+    }
+
+    if (eor == NULL) {
+        debugs(33, 5, HERE << "Incomplete request, waiting for end of request");
+        return NULL;
+    }
+
+    const size_t req_sz = eor + 1 - inBuf;
+
+    // skip leading whitespaces
+    const char *boc = inBuf; // beginning of command
+    while (boc < eor && isspace(*boc)) ++boc;
+    if (boc >= eor) {
+        debugs(33, 5, HERE << "Empty request, ignoring");
+        connNoteUseOfBuffer(connState, req_sz);
+        return NULL;
+    }
+
+    const char *eoc = boc; // end of command
+    while (eoc < eor && !isspace(*eoc)) ++eoc;
+    connState->in.buf.setAt(eoc - inBuf, '\0');
+
+    const char *bop = eoc + 1; // beginning of parameter
+    while (bop < eor && isspace(*bop)) ++bop;
+    if (bop < eor) {
+        const char *eop = eor - 1;
+        while (isspace(*eop)) --eop;
+        assert(eop >= bop);
+        connState->in.buf.setAt(eop + 1 - inBuf, '\0');
+    } else
+        bop = NULL;
+
+    debugs(33, 7, HERE << "Parsed FTP command " << boc << " with " <<
+           (bop == NULL ? "no " : "") << "parameters" <<
+           (bop != NULL ? ": " : "") << bop);
+
+    // TODO: Use SBuf instead of String
+    const String cmd = boc;
+    String params = bop;
+
+    connNoteUseOfBuffer(connState, req_sz);
+
+    if (!connState->ftp.readGreeting) {
+        // the first command must be USER
+        if (!connState->pinning.pinned && cmd.caseCmp("USER") != 0) {
+            FtpWriteEarlyReply(connState, 530, "Must login first");
+            return NULL;
+        }
+    }
+
+    // We need to process USER request now because it sets ftp server Hostname.
+    if (cmd.caseCmp("USER") == 0 &&
+        !FtpHandleUserRequest(connState, cmd, params))
+        return NULL;
+
+    if (!FtpSupportedCommand(cmd)) {
+        FtpWriteEarlyReply(connState, 502, "Unknown or unsupported command");
+        return NULL;
+    }
+
+    *method_p = !cmd.caseCmp("APPE") || !cmd.caseCmp("STOR") ||
+        !cmd.caseCmp("STOU") ? Http::METHOD_PUT : Http::METHOD_GET;
+
+    char *uri;
+    const char *aPath = params.size() > 0 && Ftp::hasPathParameter(cmd)?
+        params.termedBuf() : NULL;
+    uri = xstrdup(connState->ftpBuildUri(aPath));
+    HttpRequest *const request =
+        HttpRequest::CreateFromUrlAndMethod(uri, *method_p);
+    if (request == NULL) {
+        debugs(33, 5, HERE << "Invalid FTP URL: " << connState->ftp.uri);
+        FtpWriteEarlyReply(connState, 501, "Invalid host");
+        connState->ftp.uri.clean();
+        safe_free(uri);
+        return NULL;
+    }
+
+    request->http_ver = *http_ver;
+
+    // Our fake Request-URIs are not distinctive enough for caching to work
+    request->flags.cachable = false; // XXX: reset later by maybeCacheable()
+    request->flags.noCache = true;
+
+    request->header.putStr(HDR_FTP_COMMAND, cmd.termedBuf());
+    request->header.putStr(HDR_FTP_ARGUMENTS, params.termedBuf() != NULL ?
+                           params.termedBuf() : "");
+    if (*method_p == Http::METHOD_PUT) {
+        request->header.putStr(HDR_EXPECT, "100-continue");
+        request->header.putStr(HDR_TRANSFER_ENCODING, "chunked");
+    }
+
+    ClientHttpRequest *const http = new ClientHttpRequest(connState);
+    http->request = request;
+    HTTPMSGLOCK(http->request);
+    http->req_sz = req_sz;
+    http->uri = uri;
+
+    ClientSocketContext *const result =
+        new ClientSocketContext(connState->clientConnection, http);
+
+    StoreIOBuffer tempBuffer;
+    tempBuffer.data = result->reqbuf;
+    tempBuffer.length = HTTP_REQBUF_SZ;
+
+    ClientStreamData newServer = new clientReplyContext(http);
+    ClientStreamData newClient = result;
+    clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach,
+                     clientReplyStatus, newServer, clientSocketRecipient,
+                     clientSocketDetach, newClient, tempBuffer);
+
+    result->registerWithConn();
+    result->flags.parsed_ok = 1;
+    connState->flags.readMore = false;
+    return result;
+}
+
+static void
+FtpHandleReply(ClientSocketContext *context, HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http && context->http->al != NULL &&
+        !context->http->al->reply && reply) {
+        context->http->al->reply = reply;
+        HTTPMSGLOCK(context->http->al->reply);
+    }
+
+    static FtpReplyHandler *handlers[] = {
+        NULL, // FTP_BEGIN
+        NULL, // FTP_CONNECTED
+        FtpHandleFeatReply, // FTP_HANDLE_FEAT
+        FtpHandlePasvReply, // FTP_HANDLE_PASV
+        FtpHandlePortReply, // FTP_HANDLE_PORT
+        FtpHandleDataReply, // FTP_HANDLE_DATA_REQUEST
+        FtpHandleUploadReply, // FTP_HANDLE_UPLOAD_REQUEST
+        FtpHandleEprtReply,// FTP_HANDLE_EPRT
+        FtpHandleEpsvReply,// FTP_HANDLE_EPSV
+        NULL, // FTP_HANDLE_CWD
+        NULL, //FTP_HANDLE_PASS
+        NULL, // FTP_HANDLE_CDUP
+        FtpHandleErrorReply // FTP_ERROR
+    };
+    const ConnStateData::FtpState state = context->getConn()->ftp.state;
+    FtpReplyHandler *const handler = handlers[state];
+    if (handler)
+        (*handler)(context, reply, data);
+    else
+        FtpWriteForwardedReply(context, reply);
+}
+
+static void
+FtpHandleFeatReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support FEAT", reply);
+        return;
+    }
+
+    HttpReply *filteredReply = reply->clone();
+    HttpHeader &filteredHeader = filteredReply->header;
+
+    // Remove all unsupported commands from the response wrapper.
+    int deletedCount = 0;
+    HttpHeaderPos pos = HttpHeaderInitPos;
+    bool hasEPRT = false;
+    bool hasEPSV = false;
+    int prependSpaces = 1;
+    while (const HttpHeaderEntry *e = filteredHeader.getEntry(&pos)) {
+        if (e->id == HDR_FTP_PRE) {
+            // assume RFC 2389 FEAT response format, quoted by Squid:
+            // <"> SP NAME [SP PARAMS] <">
+            // but accommodate MS servers sending four SPs before NAME
+            if (e->value.size() < 4)
+                continue;
+            const char *raw = e->value.termedBuf();
+            if (raw[0] != '"' || raw[1] != ' ')
+                continue;
+            const char *beg = raw + 1 + strspn(raw + 1, " "); // after quote and spaces
+            // command name ends with (SP parameter) or quote
+            const char *end = beg + strcspn(beg, " \"");
+
+            if (end <= beg)
+                continue;
+
+            // compute the number of spaces before the command
+            prependSpaces = beg - raw - 1;
+
+            const String cmd = e->value.substr(beg-raw, end-raw);
+
+            if (!FtpSupportedCommand(cmd))
+                filteredHeader.delAt(pos, deletedCount);
+
+            if (cmd == "EPRT")
+                hasEPRT = true;
+            else if (cmd == "EPSV")
+                hasEPSV = true;
+        }
+    }
+
+    char buf[256];
+    int insertedCount = 0;
+    if (!hasEPRT) {
+        snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPRT");
+        filteredHeader.putStr(HDR_FTP_PRE, buf);
+        ++insertedCount;
+    }
+    if (!hasEPSV) {
+        snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPSV");
+        filteredHeader.putStr(HDR_FTP_PRE, buf);
+        ++insertedCount;
+    }
+
+    if (deletedCount || insertedCount) {
+        filteredHeader.refreshMask();
+        debugs(33, 5, "deleted " << deletedCount << " inserted " << insertedCount);
+    }
+
+    FtpWriteForwardedReply(context, filteredReply);
+}
+
+static void
+FtpHandlePasvReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support PASV", reply);
+        return;
+    }
+
+    FtpCloseDataConnection(context->getConn());
+
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    ConnStateData * const connState = context->getConn();
+    conn->flags = COMM_NONBLOCKING;
+    conn->local = connState->transparent() ?
+                  connState->port->s : context->clientConnection->local;
+    conn->local.port(0);
+    const char *const note = connState->ftp.uri.termedBuf();
+    comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn, note);
+    if (!Comm::IsConnOpen(conn)) {
+            debugs(5, DBG_CRITICAL, HERE << "comm_open_listener failed:" <<
+                   conn->local << " error: " << errno);
+            FtpWriteCustomReply(context, 451, "Internal error");
+            return;
+    }
+
+    typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+    RefCount<AcceptCall> subCall = commCbCall(5, 5, "FtpAcceptDataConnection",
+        CommAcceptCbPtrFun(FtpAcceptDataConnection, connState));
+    Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+    connState->ftp.listener = subCall.getRaw();
+    connState->ftp.dataListenConn = conn;
+    AsyncJob::Start(new Comm::TcpAcceptor(conn, note, sub));
+
+    char addr[MAX_IPSTRLEN];
+    // remote server in interception setups and local address otherwise
+    const Ip::Address &server = connState->transparent() ?
+                                context->clientConnection->local : conn->local;
+    server.toStr(addr, MAX_IPSTRLEN, AF_INET);
+    addr[MAX_IPSTRLEN - 1] = '\0';
+    for (char *c = addr; *c != '\0'; ++c) {
+        if (*c == '.')
+            *c = ',';
+    }
+
+    // conn->fd is the client data connection (and its local port)
+    const unsigned short port = comm_local_port(conn->fd);
+    conn->local.port(port);
+
+    // In interception setups, we combine remote server address with a
+    // local port number and hope that traffic will be redirected to us.
+    MemBuf mb;
+    mb.init();
+
+    // Do not use "227 =a,b,c,d,p1,p2" format or omit parens: some nf_ct_ftp
+    // versions block responses that use those alternative syntax rules!
+    mb.Printf("227 Entering Passive Mode (%s,%i,%i).\r\n",
+              addr,
+              static_cast<int>(port / 256),
+              static_cast<int>(port % 256));
+
+    debugs(11, 3, Raw("writing", mb.buf, mb.size));
+    FtpWriteReply(context, mb);
+}
+
+static void
+FtpHandlePortReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support PASV (converted from PORT)", reply);
+        return;
+    }
+
+    FtpWriteCustomReply(context, 200, "PORT successfully converted to PASV.");
+
+    // and wait for RETR
+}
+
+static void
+FtpHandleErrorReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    ConnStateData *const connState = context->getConn();
+    if (!connState->pinning.pinned) // we failed to connect to server
+        connState->ftp.uri.clean();
+    // 421: we will close due to FTP_ERROR
+    FtpWriteErrorReply(context, reply, 421);
+}
+
+static void
+FtpHandleDataReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    ConnStateData *const conn = context->getConn();
+
+    if (reply != NULL && reply->sline.status() != Http::scOkay) {
+        FtpWriteForwardedReply(context, reply);
+        if (conn && Comm::IsConnOpen(conn->ftp.dataConn)) {
+            debugs(33, 3, "closing " << conn->ftp.dataConn << " on KO reply");
+            FtpCloseDataConnection(conn);
+        }
+        return;
+    }
+
+    if (!conn->ftp.dataConn) {
+        // We got STREAM_COMPLETE (or error) and closed the client data conn.
+        debugs(33, 3, "ignoring FTP srv data response after clt data closure");
+        return;
+    }
+
+    if (!FtpCheckDataConnPost(context)) {
+        FtpWriteCustomReply(context, 425, "Data connection is not established.");
+        FtpCloseDataConnection(conn);
+        return;
+    }
+
+    debugs(33, 7, HERE << data.length);
+
+    if (data.length <= 0) {
+        FtpWroteReplyData(conn->clientConnection, NULL, 0, Comm::OK, 0, context);
+        return;
+    }
+
+    MemBuf mb;
+    mb.init(data.length + 1, data.length + 1);
+    mb.append(data.data, data.length);
+
+    AsyncCall::Pointer call = commCbCall(33, 5, "FtpWroteReplyData",
+        CommIoCbPtrFun(&FtpWroteReplyData, context));
+    Comm::Write(conn->ftp.dataConn, &mb, call);
+
+    context->noteSentBodyBytes(data.length);
+}
+
+static void
+FtpWroteReplyData(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int xerrno, void *data)
+{
+    if (errflag == Comm::ERR_CLOSING)
+        return;
+
+    ClientSocketContext *const context = static_cast<ClientSocketContext*>(data);
+    ConnStateData *const connState = context->getConn();
+
+    if (errflag != Comm::OK) {
+        debugs(33, 3, HERE << "FTP reply data writing failed: " <<
+               xstrerr(xerrno));
+        FtpCloseDataConnection(connState);
+        FtpWriteCustomReply(context, 426, "Data connection error; transfer aborted");
+        return;
+    }
+
+    assert(context->http);
+    context->http->out.size += size;
+
+    switch (context->socketState()) {
+    case STREAM_NONE:
+        debugs(33, 3, "Keep going");
+        context->pullData();
+        return;
+    case STREAM_COMPLETE:
+        debugs(33, 3, HERE << "FTP reply data transfer successfully complete");
+        FtpWriteCustomReply(context, 226, "Transfer complete");
+        break;
+    case STREAM_UNPLANNED_COMPLETE:
+        debugs(33, 3, HERE << "FTP reply data transfer failed: STREAM_UNPLANNED_COMPLETE");
+        FtpWriteCustomReply(context, 451, "Server error; transfer aborted");
+        break;
+    case STREAM_FAILED:
+        debugs(33, 3, HERE << "FTP reply data transfer failed: STREAM_FAILED");
+        FtpWriteCustomReply(context, 451, "Server error; transfer aborted");
+        break;
+    default:
+        fatal("unreachable code");
+    }
+
+    FtpCloseDataConnection(connState);
+}
+
+static void
+FtpHandleUploadReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    FtpWriteForwardedReply(context, reply);
+    // note that the client data connection may already be closed by now
+}
+
+static void
+FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply)
+{
+    const AsyncCall::Pointer call = commCbCall(33, 5, "FtpWroteReply",
+        CommIoCbPtrFun(&FtpWroteReply, context));
+    FtpWriteForwardedReply(context, reply, call);
+}
+
+static void
+FtpHandleEprtReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support PASV (converted from EPRT)", reply);
+        return;
+    }
+
+    FtpWriteCustomReply(context, 200, "EPRT successfully converted to PASV.");
+
+    // and wait for RETR
+}
+
+static void
+FtpHandleEpsvReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Cannot connect to server", reply);
+        return;
+    }
+
+    FtpCloseDataConnection(context->getConn());
+
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    ConnStateData * const connState = context->getConn();
+    conn->flags = COMM_NONBLOCKING;
+    conn->local = connState->transparent() ?
+                  connState->port->s : context->clientConnection->local;
+    conn->local.port(0);
+    const char *const note = connState->ftp.uri.termedBuf();
+    comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn, note);
+    if (!Comm::IsConnOpen(conn)) {
+            debugs(5, DBG_CRITICAL, "comm_open_listener failed: " <<
+                   conn->local << " error: " << errno);
+            FtpWriteCustomReply(context, 451, "Internal error");
+            return;
+    }
+
+    typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+    RefCount<AcceptCall> subCall = commCbCall(5, 5, "FtpAcceptDataConnection",
+        CommAcceptCbPtrFun(FtpAcceptDataConnection, connState));
+    Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+    connState->ftp.listener = subCall.getRaw();
+    connState->ftp.dataListenConn = conn;
+    AsyncJob::Start(new Comm::TcpAcceptor(conn, note, sub));
+
+    // conn->fd is the client data connection (and its local port)
+    const unsigned int port = comm_local_port(conn->fd);
+    conn->local.port(port);
+
+    // In interception setups, we combine remote server address with a
+    // local port number and hope that traffic will be redirected to us.
+    MemBuf mb;
+    mb.init();
+    mb.Printf("229 Entering Extended Passive Mode (|||%u|)\r\n", port);
+
+    debugs(11, 3, Raw("writing", mb.buf, mb.size));
+    FtpWriteReply(context, mb);
+}
+
+/// writes FTP error response with given status and reply-derived error details
+static void
+FtpWriteErrorReply(ClientSocketContext *context, const HttpReply *reply, const int status)
+{
+    MemBuf mb;
+    mb.init();
+
+    assert(context->http);
+    const HttpRequest *request = context->http->request;
+    assert(request);
+    if (request->errType != ERR_NONE)
+        mb.Printf("%i-%s\r\n", status, errorPageName(request->errType));
+
+    if (request->errDetail > 0) {
+        // XXX: > 0 may not always mean that this is an errno
+        mb.Printf("%i-Error: (%d) %s\r\n", status,
+                  request->errDetail,
+                  strerror(request->errDetail));
+    }
+
+    // XXX: Remove hard coded names. Use an error page template instead.
+    const Adaptation::History::Pointer ah = request->adaptHistory();
+    if (ah != NULL) { // XXX: add adapt::<all_h but use lastMeta here
+        const String info = ah->allMeta.getByName("X-Response-Info");
+        const String desc = ah->allMeta.getByName("X-Response-Desc");
+        if (info.size())
+            mb.Printf("%i-Information: %s\r\n", status, info.termedBuf());
+        if (desc.size())
+            mb.Printf("%i-Description: %s\r\n", status, desc.termedBuf());
+    }
+
+    assert(reply != NULL);
+    const char *reason = reply->header.has(HDR_FTP_REASON) ?
+                         reply->header.getStr(HDR_FTP_REASON):
+                         reply->sline.reason();
+
+    mb.Printf("%i %s\r\n", status, reason); // error terminating line
+
+    // TODO: errorpage.cc should detect FTP client and use
+    // configurable FTP-friendly error templates which we should
+    // write to the client "as is" instead of hiding most of the info
+
+    FtpWriteReply(context, mb);
+}
+
+/// writes FTP response based on HTTP reply that is not an FTP-response wrapper
+static void 
+FtpWriteForwardedForeign(ClientSocketContext *context, const HttpReply *reply)
+{
+    ConnStateData *const connState = context->getConn();
+    FtpChangeState(connState, ConnStateData::FTP_CONNECTED, "foreign reply");
+    //Close the data connection.
+    FtpCloseDataConnection(connState);
+    // 451: We intend to keep the control connection open.
+    FtpWriteErrorReply(context, reply, 451);
+}
+
+static void
+FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply, AsyncCall::Pointer call)
+{
+    assert(reply != NULL);
+    const HttpHeader &header = reply->header;
+    ConnStateData *const connState = context->getConn();
+
+    // adaptation and forwarding errors lack HDR_FTP_STATUS
+    if (!header.has(HDR_FTP_STATUS)) {
+        FtpWriteForwardedForeign(context, reply);
+        return;
+    }
+
+    assert(header.has(HDR_FTP_REASON));
+
+    const int status = header.getInt(HDR_FTP_STATUS);
+    debugs(33, 7, HERE << "status: " << status);
+
+    // Status 125 or 150 implies upload or data request, but we still check
+    // the state in case the server is buggy.
+    if ((status == 125 || status == 150) &&
+        (connState->ftp.state == ConnStateData::FTP_HANDLE_UPLOAD_REQUEST ||
+         connState->ftp.state == ConnStateData::FTP_HANDLE_DATA_REQUEST)) {
+        if (FtpCheckDataConnPost(context)) {
+            // If the data connection is ready, start reading data (here)
+            // and forward the response to client (further below).
+            debugs(33, 7, "data connection established, start data transfer");
+            if (connState->ftp.state == ConnStateData::FTP_HANDLE_UPLOAD_REQUEST)
+                connState->readSomeFtpData();
+        } else {
+            // If we are waiting to accept the data connection, keep waiting.
+            if (Comm::IsConnOpen(connState->ftp.dataListenConn)) {
+                debugs(33, 7, "wait for the client to establish a data connection");
+                connState->ftp.onDataAcceptCall = call;
+                // TODO: Add connect timeout for passive connections listener?
+                // TODO: Remember server response so that we can forward it?
+            } else {
+                // Either the connection was establised and closed after the
+                // data was transferred OR we failed to establish an active
+                // data connection and already sent the error to the client.
+                // In either case, there is nothing more to do.
+                debugs(33, 7, "done with data OR active connection failed");
+            }
+            return;
+        }
+    }
+
+    MemBuf mb;
+    mb.init();
+    FtpPrintReply(mb, reply);
+
+    debugs(11, 2, "FTP Client " << context->clientConnection);
+    debugs(11, 2, "FTP Client REPLY:\n---------\n" << mb.buf <<
+           "\n----------");
+
+    Comm::Write(context->clientConnection, &mb, call);
+}
+
+static void
+FtpPrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix)
+{
+    const HttpHeader &header = reply->header;
+
+    HttpHeaderPos pos = HttpHeaderInitPos;
+    while (const HttpHeaderEntry *e = header.getEntry(&pos)) {
+        if (e->id == HDR_FTP_PRE) {
+            String raw;
+            if (httpHeaderParseQuotedString(e->value.rawBuf(), e->value.size(), &raw))
+                mb.Printf("%s\r\n", raw.termedBuf());
+        }
+    }
+
+    if (header.has(HDR_FTP_STATUS)) {
+        const char *reason = header.getStr(HDR_FTP_REASON);
+        mb.Printf("%i %s\r\n", header.getInt(HDR_FTP_STATUS),
+                  (reason ? reason : 0));
+    }
+}
+
+static void
+FtpWroteEarlyReply(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int xerrno, void *data)
+{
+    if (errflag == Comm::ERR_CLOSING)
+        return;
+
+    if (errflag != Comm::OK) {
+        debugs(33, 3, HERE << "FTP reply writing failed: " << xstrerr(xerrno));
+        conn->close();
+        return;
+    }
+
+    ConnStateData *const connState = static_cast<ConnStateData*>(data);
+    ClientSocketContext::Pointer context = connState->getCurrentContext();
+    if (context != NULL && context->http) {
+        context->http->out.size += size;
+        context->http->out.headers_sz += size;
+    }
+
+    connState->flags.readMore = true;
+    connState->readSomeData();
+}
+
+static void
+FtpWroteReply(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int xerrno, void *data)
+{
+    if (errflag == Comm::ERR_CLOSING)
+        return;
+
+    if (errflag != Comm::OK) {
+        debugs(33, 3, HERE << "FTP reply writing failed: " <<
+               xstrerr(xerrno));
+        conn->close();
+        return;
+    }
+
+    ClientSocketContext *const context =
+        static_cast<ClientSocketContext*>(data);
+    ConnStateData *const connState = context->getConn();
+
+    assert(context->http);
+    context->http->out.size += size;
+    context->http->out.headers_sz += size;
+
+    if (connState->ftp.state == ConnStateData::FTP_ERROR) {
+        debugs(33, 5, "closing on FTP server error");
+        conn->close();
+        return;
+    }
+
+    const clientStream_status_t socketState = context->socketState();
+    debugs(33, 5, "FTP client stream state " << socketState);
+    switch (socketState) {
+    case STREAM_UNPLANNED_COMPLETE:
+    case STREAM_FAILED:
+         conn->close();
+         return;
+
+    case STREAM_NONE:
+    case STREAM_COMPLETE:
+        connState->flags.readMore = true;
+        FtpChangeState(connState, ConnStateData::FTP_CONNECTED, "FtpWroteReply");
+        if (connState->in.bodyParser)
+            connState->finishDechunkingRequest(false);
+        context->keepaliveNextRequest();
+        return;
+    }
+}
+
+bool
+FtpHandleRequest(ClientSocketContext *context, String &cmd, String &params) {
+    if (HttpRequest *request = context->http->request) {
+        MemBuf *mb = new MemBuf;
+        Packer p;
+        mb->init();
+        packerToMemInit(&p, mb);
+        request->pack(&p);
+        packerClean(&p);
+
+        debugs(11, 2, "FTP Client " << context->clientConnection);
+        debugs(11, 2, "FTP Client REQUEST:\n---------\n" << mb->buf <<
+               "\n----------");
+        delete mb;
+    }
+
+    static std::pair<const char *, FtpRequestHandler *> handlers[] = {
+        std::make_pair("LIST", FtpHandleDataRequest),
+        std::make_pair("NLST", FtpHandleDataRequest),
+        std::make_pair("MLSD", FtpHandleDataRequest),
+        std::make_pair("FEAT", FtpHandleFeatRequest),
+        std::make_pair("PASV", FtpHandlePasvRequest),
+        std::make_pair("PORT", FtpHandlePortRequest),
+        std::make_pair("RETR", FtpHandleDataRequest),
+        std::make_pair("EPRT", FtpHandleEprtRequest),
+        std::make_pair("EPSV", FtpHandleEpsvRequest),
+        std::make_pair("CWD", FtpHandleCwdRequest),
+        std::make_pair("PASS", FtpHandlePassRequest),
+        std::make_pair("CDUP", FtpHandleCdupRequest),
+    };
+
+    FtpRequestHandler *handler = NULL;
+    if (context->http->request->method == Http::METHOD_PUT)
+        handler = FtpHandleUploadRequest;
+    else {
+        for (size_t i = 0; i < sizeof(handlers) / sizeof(*handlers); ++i) {
+            if (cmd.caseCmp(handlers[i].first) == 0) {
+                handler = handlers[i].second;
+                break;
+            }
+        }
+    }
+
+    return handler != NULL ? (*handler)(context, cmd, params) : true;
+}
+
+/// Called to parse USER command, which is required to create an HTTP request
+/// wrapper. Thus, errors are handled with FtpWriteEarlyReply() here.
+bool
+FtpHandleUserRequest(ConnStateData *connState, const String &cmd, String &params)
+{
+    if (params.size() == 0) {
+        FtpWriteEarlyReply(connState, 501, "Missing username");
+        return false;
+    }
+
+    const String::size_type eou = params.rfind('@');
+    if (eou == String::npos || eou + 1 >= params.size()) {
+        FtpWriteEarlyReply(connState, 501, "Missing host");
+        return false;
+    }
+
+    const String login = params.substr(0, eou);
+    String host = params.substr(eou + 1, params.size());
+    // If we can parse it as raw IPv6 address, then surround with "[]".
+    // Otherwise (domain, IPv4, [bracketed] IPv6, garbage, etc), use as is.
+    if (host.pos(":")) {
+        char ipBuf[MAX_IPSTRLEN];
+        Ip::Address ipa;
+        ipa = host.termedBuf();
+        if (!ipa.isAnyAddr()) {
+            ipa.toHostStr(ipBuf, MAX_IPSTRLEN);
+            host = ipBuf;
+        }
+    }
+    connState->ftp.host = host;
+
+    String oldUri;
+    if (connState->ftp.readGreeting)
+        oldUri = connState->ftp.uri;
+
+    connState->ftpSetWorkingDir(NULL);
+    connState->ftpBuildUri();
+
+    if (!connState->ftp.readGreeting) {
+        debugs(11, 3, "set URI to " << connState->ftp.uri);
+    } else if (oldUri.caseCmp(connState->ftp.uri) == 0) {
+        debugs(11, 5, "keep URI as " << oldUri);
+    } else {
+        debugs(11, 3, "reset URI from " << oldUri << " to " << connState->ftp.uri);
+        FtpCloseDataConnection(connState);
+        connState->ftp.readGreeting = false;
+        connState->unpinConnection(true); // close control connection to the server
+        FtpChangeState(connState, ConnStateData::FTP_BEGIN, "URI reset");
+    }
+
+    params.cut(eou);
+
+    return true;
+}
+
+bool
+FtpHandleFeatRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_FEAT, "FtpHandleFeatRequest");
+
+    return true;
+}
+
+bool
+FtpHandlePasvRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    ConnStateData *const connState = context->getConn();
+    assert(connState);
+    if (connState->ftp.gotEpsvAll) {
+        FtpSetReply(context, 500, "Bad PASV command");
+        return false;
+    }
+
+    if (params.size() > 0) {
+        FtpSetReply(context, 501, "Unexpected parameter");
+        return false;
+    }
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_PASV, "FtpHandlePasvRequest");
+    // no need to fake PASV request via FtpSetDataCommand() in true PASV case
+    return true;
+}
+
+/// [Re]initializes dataConn for active data transfers. Does not connect.
+static
+bool FtpCreateDataConnection(ClientSocketContext *context, Ip::Address cltAddr)
+{
+    ConnStateData *const connState = context->getConn();
+    assert(connState);
+    assert(connState->clientConnection != NULL);
+    assert(!connState->clientConnection->remote.isAnyAddr());
+
+    if (cltAddr != connState->clientConnection->remote) {
+        debugs(33, 2, "rogue PORT " << cltAddr << " request? ctrl: " << connState->clientConnection->remote);
+        // Closing the control connection would not help with attacks because
+        // the client is evidently able to connect to us. Besides, closing
+        // makes retrials easier for the client and more damaging to us.
+        FtpSetReply(context, 501, "Prohibited parameter value");
+        return false;
+    }
+
+    FtpCloseDataConnection(context->getConn());
+
+    Comm::ConnectionPointer conn = new Comm::Connection();
+    conn->remote = cltAddr;
+
+    // Use local IP address of the control connection as the source address
+    // of the active data connection, or some clients will refuse to accept.
+    conn->flags |= COMM_DOBIND;
+    conn->local = connState->clientConnection->local;
+    // RFC 959 requires active FTP connections to originate from port 20
+    // but that would preclude us from supporting concurrent transfers! (XXX?)
+    conn->local.port(0);
+
+    debugs(11, 3, "will actively connect from " << conn->local << " to " <<
+           conn->remote);
+
+    context->getConn()->ftp.dataConn = conn;
+    context->getConn()->ftp.uploadAvailSize = 0;
+    return true;
+}
+
+bool
+FtpHandlePortRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    // TODO: Should PORT errors trigger FtpCloseDataConnection() cleanup?
+
+    const ConnStateData *connState = context->getConn();
+    if (connState->ftp.gotEpsvAll) {
+        FtpSetReply(context, 500, "Rejecting PORT after EPSV ALL");
+        return false;
+    }
+
+    if (!params.size()) {
+        FtpSetReply(context, 501, "Missing parameter");
+        return false;
+    }
+
+    Ip::Address cltAddr;
+    if (!Ftp::ParseIpPort(params.termedBuf(), NULL, cltAddr)) {
+        FtpSetReply(context, 501, "Invalid parameter");
+        return false;
+    }
+
+    if (!FtpCreateDataConnection(context, cltAddr))
+        return false;
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_PORT, "FtpHandlePortRequest");
+    FtpSetDataCommand(context);
+    return true; // forward our fake PASV request
+}
+
+bool
+FtpHandleDataRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    if (!FtpCheckDataConnPre(context))
+        return false;
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_DATA_REQUEST, "FtpHandleDataRequest");
+
+    return true;
+}
+
+bool
+FtpHandleUploadRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    if (!FtpCheckDataConnPre(context))
+        return false;
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_UPLOAD_REQUEST, "FtpHandleDataRequest");
+
+    return true;
+}
+
+bool
+FtpHandleEprtRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    debugs(11, 3, "Process an EPRT " << params);
+
+    const ConnStateData *connState = context->getConn();
+    if (connState->ftp.gotEpsvAll) {
+        FtpSetReply(context, 500, "Rejecting EPRT after EPSV ALL");
+        return false;
+    }
+
+    if (!params.size()) {
+        FtpSetReply(context, 501, "Missing parameter");
+        return false;
+    }
+
+    Ip::Address cltAddr;
+    if (!Ftp::ParseProtoIpPort(params.termedBuf(), cltAddr)) {
+        FtpSetReply(context, 501, "Invalid parameter");
+        return false;
+    }
+
+    if (!FtpCreateDataConnection(context, cltAddr))
+        return false;
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_EPRT, "FtpHandleEprtRequest");
+    FtpSetDataCommand(context);
+    return true; // forward our fake PASV request
+}
+
+bool
+FtpHandleEpsvRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    debugs(11, 3, "Process an EPSV command with params: " << params);
+    if (params.size() <= 0) {
+        // treat parameterless EPSV as "use the protocol of the ctrl conn"
+    } else if (params.caseCmp("ALL") == 0) {
+        ConnStateData *connState = context->getConn();
+        FtpSetReply(context, 200, "EPSV ALL ok");
+        connState->ftp.gotEpsvAll = true;
+        return false;
+    } else if (params.cmp("2") == 0) {
+        if (!Ip::EnableIpv6) {
+            FtpSetReply(context, 522, "Network protocol not supported, use (1)");
+            return false;
+        }
+    } else if (params.cmp("1") != 0) {
+        FtpSetReply(context, 501, "Unsupported EPSV parameter");
+        return false;
+    }
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_EPSV, "FtpHandleEpsvRequest");
+    FtpSetDataCommand(context);
+    return true; // forward our fake PASV request
+}
+
+bool
+FtpHandleCwdRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_CWD, "FtpHandleCwdRequest");
+    return true;
+}
+
+bool
+FtpHandlePassRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_PASS, "FtpHandlePassRequest");
+    return true;
+}
+
+bool
+FtpHandleCdupRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_CDUP, "FtpHandleCdupRequest");
+    return true;
+}
+
+// Convert client PORT, EPRT, PASV, or EPSV data command to Squid PASV command.
+// Squid server-side decides what data command to use on that side.
+void
+FtpSetDataCommand(ClientSocketContext *context)
+{
+    ClientHttpRequest *const http = context->http;
+    assert(http != NULL);
+    HttpRequest *const request = http->request;
+    assert(request != NULL);
+    HttpHeader &header = request->header;
+    header.delById(HDR_FTP_COMMAND);
+    header.putStr(HDR_FTP_COMMAND, "PASV");
+    header.delById(HDR_FTP_ARGUMENTS);
+    header.putStr(HDR_FTP_ARGUMENTS, "");
+    debugs(11, 5, "client data command converted to fake PASV");
+}
+
+/// check that client data connection is ready for future I/O or at least
+/// has a chance of becoming ready soon.
+bool
+FtpCheckDataConnPre(ClientSocketContext *context)
+{
+    ConnStateData *const connState = context->getConn();
+    if (Comm::IsConnOpen(connState->ftp.dataConn))
+        return true;
+
+    if (Comm::IsConnOpen(connState->ftp.dataListenConn)) {
+        // We are still waiting for a client to connect to us after PASV.
+        // Perhaps client's data conn handshake has not reached us yet.
+        // After we talk to the server, FtpCheckDataConnPost() will recheck.
+        debugs(33, 3, "expecting clt data conn " << connState->ftp.dataListenConn);
+        return true;
+    }
+
+    if (!connState->ftp.dataConn || connState->ftp.dataConn->remote.isAnyAddr()) {
+        debugs(33, 5, "missing " << connState->ftp.dataConn);
+        // TODO: use client address and default port instead.
+        FtpSetReply(context, 425, "Use PORT or PASV first");
+        return false;
+    }
+
+    // active transfer: open a connection from Squid to client
+    AsyncCall::Pointer connector = context->getConn()->ftp.connector =
+        commCbCall(17, 3, "FtpConnectDoneWrapper", 
+                   CommConnectCbPtrFun(FtpHandleConnectDone, context));
+
+    Comm::ConnOpener *cs = new Comm::ConnOpener(connState->ftp.dataConn,
+                                                connector,
+                                                Config.Timeout.connect);
+    AsyncJob::Start(cs);
+    return false; // ConnStateData::processFtpRequest waits FtpHandleConnectDone
+}
+
+/// Check that client data connection is ready for immediate I/O.
+static bool
+FtpCheckDataConnPost(ClientSocketContext *context)
+{
+    ConnStateData *connState = context->getConn();
+    assert(connState);
+    const Comm::ConnectionPointer &dataConn = connState->ftp.dataConn;
+    if (!Comm::IsConnOpen(dataConn)) {
+        debugs(33, 3, "missing client data conn: " << dataConn);
+        return false;
+    }
+    return true;
+}
+
+void
+FtpHandleConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int xerrno, void *data)
+{
+    ClientSocketContext *context = static_cast<ClientSocketContext*>(data);
+    context->getConn()->ftp.connector = NULL;
+
+    if (status != Comm::OK) {
+        conn->close();
+        FtpSetReply(context, 425, "Cannot open data connection.");
+        assert(context->http && context->http->storeEntry() != NULL);
+    } else {
+        assert(context->getConn()->ftp.dataConn == conn);
+        assert(Comm::IsConnOpen(conn));
+        fd_note(conn->fd, "active client ftp data");
+    }
+    context->getConn()->resumeFtpRequest(context);
+}
+
+void
+FtpSetReply(ClientSocketContext *context, const int code, const char *msg)
+{
+    ClientHttpRequest *const http = context->http;
+    assert(http != NULL);
+    assert(http->storeEntry() == NULL);
+
+    HttpReply *const reply = new HttpReply;
+    reply->sline.set(Http::ProtocolVersion(1, 1), Http::scNoContent);
+    HttpHeader &header = reply->header;
+    header.putTime(HDR_DATE, squid_curtime);
+    {
+        HttpHdrCc cc;
+        cc.Private();
+        header.putCc(&cc);
+    }
+    header.putInt64(HDR_CONTENT_LENGTH, 0);
+    header.putInt(HDR_FTP_STATUS, code);
+    header.putStr(HDR_FTP_REASON, msg);
+    reply->hdrCacheInit();
+
+    setLogUri(http, urlCanonicalClean(http->request));
+
+    clientStreamNode *const node = context->getClientReplyContext();
+    clientReplyContext *const repContext =
+        dynamic_cast<clientReplyContext *>(node->data.getRaw());
+    assert(repContext != NULL);
+
+    RequestFlags flags;
+    flags.cachable = false; // force releaseRequest() in storeCreateEntry()
+    flags.noCache = true;
+    repContext->createStoreEntry(http->request->method, flags);
+    http->storeEntry()->replaceHttpReply(reply);
+}
+
+/// Whether Squid FTP gateway supports a given feature (e.g., a command).
+static bool
+FtpSupportedCommand(const String &name)
+{
+    static std::set<std::string> BlackList;
+    if (BlackList.empty()) {
+        /* Add FTP commands that Squid cannot gateway correctly */
+
+        // we probably do not support AUTH TLS.* and AUTH SSL,
+        // but let's disclaim all AUTH support to KISS, for now
+        BlackList.insert("AUTH");
+    }
+
+    // we claim support for all commands that we do not know about
+    return BlackList.find(name.termedBuf()) == BlackList.end();
+}