]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/client_side.cc
Merged from trunk (r13356).
[thirdparty/squid.git] / src / client_side.cc
index 3efa6661b4920cc120dde181fc61b77e9c450b75..2f45c16af4d072c7f58d037cd038a7d18407402f 100644 (file)
 #include "CachePeer.h"
 #include "ChunkedCodingParser.h"
 #include "client_db.h"
+#include "client_side.h"
 #include "client_side_reply.h"
 #include "client_side_request.h"
-#include "client_side.h"
 #include "ClientRequestContext.h"
 #include "clientStream.h"
 #include "comm.h"
 #include "comm/Connection.h"
+#include "comm/ConnOpener.h"
 #include "comm/Loops.h"
 #include "comm/TcpAcceptor.h"
 #include "comm/Write.h"
 #include "CommCalls.h"
 #include "errorpage.h"
-#include "eui/Config.h"
 #include "fd.h"
 #include "fde.h"
-#include "forward.h"
+#include "FtpServer.h"
 #include "fqdncache.h"
+#include "FwdState.h"
 #include "globals.h"
 #include "http.h"
 #include "HttpHdrCc.h"
 #include "ident/Config.h"
 #include "ident/Ident.h"
 #include "internal.h"
+#include "ip/tools.h"
 #include "ipc/FdNotes.h"
 #include "ipc/StartListening.h"
 #include "log/access_log.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
-#if USE_SSL
-#include "ssl/ProxyCerts.h"
+#if USE_OPENSSL
 #include "ssl/context_storage.h"
+#include "ssl/gadgets.h"
 #include "ssl/helper.h"
+#include "ssl/ProxyCerts.h"
 #include "ssl/ServerBump.h"
 #include "ssl/support.h"
-#include "ssl/gadgets.h"
 #endif
 #if USE_SSL_CRTD
-#include "ssl/crtd_message.h"
 #include "ssl/certificate_db.h"
+#include "ssl/crtd_message.h"
 #endif
 
-#if HAVE_LIMITS_H
-#include <limits.h>
-#endif
-#if HAVE_MATH_H
-#include <math.h>
-#endif
-#if HAVE_LIMITS
+#include <climits>
+#include <cmath>
 #include <limits>
-#endif
+#include <set>
 
 #if LINGERING_CLOSE
 #define comm_close comm_lingering_close
@@ -195,29 +192,11 @@ static void clientListenerConnectionOpened(AnyP::PortCfg *s, const Ipc::FdNoteId
 
 CBDATA_CLASS_INIT(ClientSocketContext);
 
-void *
-ClientSocketContext::operator new (size_t byteCount)
-{
-    /* derived classes with different sizes must implement their own new */
-    assert (byteCount == sizeof (ClientSocketContext));
-    CBDATA_INIT_TYPE(ClientSocketContext);
-    return cbdataAlloc(ClientSocketContext);
-}
-
-void
-ClientSocketContext::operator delete (void *address)
-{
-    cbdataFree (address);
-}
-
 /* Local functions */
-/* ClientSocketContext */
-static ClientSocketContext *ClientSocketContextNew(const Comm::ConnectionPointer &clientConn, ClientHttpRequest *);
-/* other */
 static IOCB clientWriteComplete;
 static IOCB clientWriteBodyComplete;
 static IOACB httpAccept;
-#if USE_SSL
+#if USE_OPENSSL
 static IOACB httpsAccept;
 #endif
 static IOACB ftpAccept;
@@ -248,26 +227,31 @@ static void clientUpdateSocketStats(LogTags logType, size_t size);
 char *skipLeadingSpace(char *aString);
 static void connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount);
 
-static ConnStateData *connStateCreate(const Comm::ConnectionPointer &client, AnyP::PortCfg *port);
-
+static void FtpChangeState(ConnStateData *connState, const ConnStateData::FtpState newState, const char *reason);
 static IOACB FtpAcceptDataConnection;
 static void FtpCloseDataConnection(ConnStateData *conn);
-static void FtpWriteGreeting(ConnStateData *conn);
 static ClientSocketContext *FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::ProtocolVersion *http_ver);
 static bool FtpHandleUserRequest(ConnStateData *connState, const String &cmd, String &params);
+static CNCB FtpHandleConnectDone;
 
 static void FtpHandleReply(ClientSocketContext *context, HttpReply *reply, StoreIOBuffer data);
 typedef void FtpReplyHandler(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data);
+static FtpReplyHandler FtpHandleFeatReply;
 static FtpReplyHandler FtpHandlePasvReply;
+static FtpReplyHandler FtpHandlePortReply;
 static FtpReplyHandler FtpHandleErrorReply;
 static FtpReplyHandler FtpHandleDataReply;
 static FtpReplyHandler FtpHandleUploadReply;
+static FtpReplyHandler FtpHandleEprtReply;
+static FtpReplyHandler FtpHandleEpsvReply;
 
 static void FtpWriteEarlyReply(ConnStateData *conn, const int code, const char *msg);
 static void FtpWriteReply(ClientSocketContext *context, MemBuf &mb);
 static void FtpWriteCustomReply(ClientSocketContext *context, const int code, const char *msg, const HttpReply *reply = NULL);
 static void FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply);
 static void FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply, AsyncCall::Pointer call);
+static void FtpWriteErrorReply(ClientSocketContext *context, const HttpReply *reply, const int status);
+
 static void FtpPrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix = "");
 static IOCB FtpWroteEarlyReply;
 static IOCB FtpWroteReply;
@@ -275,13 +259,23 @@ static IOCB FtpWroteReplyData;
 
 typedef bool FtpRequestHandler(ClientSocketContext *context, String &cmd, String &params);
 static FtpRequestHandler FtpHandleRequest;
+static FtpRequestHandler FtpHandleFeatRequest;
 static FtpRequestHandler FtpHandlePasvRequest;
 static FtpRequestHandler FtpHandlePortRequest;
 static FtpRequestHandler FtpHandleDataRequest;
 static FtpRequestHandler FtpHandleUploadRequest;
-
-static bool FtpCheckDataConnection(ClientSocketContext *context);
+static FtpRequestHandler FtpHandleEprtRequest;
+static FtpRequestHandler FtpHandleEpsvRequest;
+static FtpRequestHandler FtpHandleCwdRequest;
+static FtpRequestHandler FtpHandlePassRequest;
+static FtpRequestHandler FtpHandleCdupRequest;
+
+static bool FtpCheckDataConnPre(ClientSocketContext *context);
+static bool FtpCheckDataConnPost(ClientSocketContext *context);
+static void FtpSetDataCommand(ClientSocketContext *context);
 static void FtpSetReply(ClientSocketContext *context, const int code, const char *msg);
+static bool FtpSupportedCommand(const String &name);
+
 
 clientStreamNode *
 ClientSocketContext::getTail() const
@@ -316,12 +310,12 @@ ConnStateData::readSomeData()
 
     debugs(33, 4, HERE << clientConnection << ": reading request...");
 
-    if (!maybeMakeSpaceAvailable())
+    if (!in.maybeMakeSpaceAvailable())
         return;
 
     typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
     reader = JobCallback(33, 5, Dialer, this, ConnStateData::clientReadRequest);
-    comm_read(clientConnection, in.addressToReadInto(), getAvailableBufferLength(), reader);
+    comm_read(clientConnection, in.buf, reader);
 }
 
 void
@@ -415,11 +409,16 @@ ClientSocketContext::connIsFinished()
     clientStreamDetach(getTail(), http);
 }
 
-ClientSocketContext::ClientSocketContext() : http(NULL), reply(NULL), next(NULL),
+ClientSocketContext::ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
+        clientConnection(aConn),
+        http(aReq),
+        reply(NULL),
+        next(NULL),
         writtenToSocket(0),
         mayUseConnection_ (false),
         connRegistered_ (false)
 {
+    assert(http != NULL);
     memset (reqbuf, '\0', sizeof (reqbuf));
     flags.deferred = 0;
     flags.parsed_ok = 0;
@@ -427,17 +426,6 @@ ClientSocketContext::ClientSocketContext() : http(NULL), reply(NULL), next(NULL)
     deferredparams.rep = NULL;
 }
 
-ClientSocketContext *
-ClientSocketContextNew(const Comm::ConnectionPointer &client, ClientHttpRequest * http)
-{
-    ClientSocketContext *newContext;
-    assert(http != NULL);
-    newContext = new ClientSocketContext;
-    newContext->http = http;
-    newContext->clientConnection = client;
-    return newContext;
-}
-
 void
 ClientSocketContext::writeControlMsg(HttpControlMsg &msg)
 {
@@ -627,7 +615,7 @@ ClientHttpRequest::updateCounters()
         ++ statCounter.client_http.errors;
 
     clientUpdateStatHistCounters(logType,
-                                 tvSubMsec(start_time, current_time));
+                                 tvSubMsec(al->cache.start_time, current_time));
 
     clientUpdateHierCounters(&request->hier);
 }
@@ -664,7 +652,6 @@ prepareLogWithRequestDetails(HttpRequest * request, AccessLogEntry::Pointer &aLo
             packerToMemInit(&p, &mb);
             ah->lastMeta.packInto(&p);
             aLogEntry->adapt.last_meta = xstrdup(mb.buf);
-            aLogEntry->notes.append(&ah->metaHeaders);
         }
 #endif
 
@@ -681,19 +668,10 @@ prepareLogWithRequestDetails(HttpRequest * request, AccessLogEntry::Pointer &aLo
     aLogEntry->http.method = request->method;
     aLogEntry->http.version = request->http_ver;
     aLogEntry->hier = request->hier;
-    if (request->helperNotes)
-        aLogEntry->notes.append(request->helperNotes->notes);
     if (request->content_length > 0) // negative when no body or unknown length
-        aLogEntry->cache.requestSize += request->content_length;
+        aLogEntry->http.clientRequestSz.payloadData += request->content_length; // XXX: actually adaptedRequest payload size ??
     aLogEntry->cache.extuser = request->extacl_user.termedBuf();
 
-#if USE_AUTH
-    if (request->auth_user_request != NULL) {
-        if (request->auth_user_request->username())
-            aLogEntry->cache.authuser = xstrdup(request->auth_user_request->username());
-    }
-#endif
-
     // Adapted request, if any, inherits and then collects all the stats, but
     // the virgin request gets logged instead; copy the stats to log them.
     // TODO: avoid losses by keeping these stats in a shared history object?
@@ -724,27 +702,19 @@ ClientHttpRequest::logRequest()
 
     debugs(33, 9, "clientLogRequest: http.code='" << al->http.code << "'");
 
-    if (loggingEntry() && loggingEntry()->mem_obj)
-        al->cache.objectSize = loggingEntry()->contentLen();
-
-    al->cache.caddr.SetNoAddr();
+    if (loggingEntry() && loggingEntry()->mem_obj && loggingEntry()->objectLen() >= 0)
+        al->cache.objectSize = loggingEntry()->contentLen(); // payload duplicate ?? with or without TE ?
 
-    if (getConn() != NULL) {
-        al->cache.caddr = getConn()->log_addr;
-        al->cache.port =  cbdataReference(getConn()->port);
-    }
-
-    al->cache.requestSize = req_sz;
-    al->cache.requestHeadersSize = req_sz;
-
-    al->cache.replySize = out.size;
-    al->cache.replyHeadersSize = out.headers_sz;
+    al->http.clientRequestSz.header = req_sz;
+    al->http.clientReplySz.header = out.headers_sz;
+    // XXX: calculate without payload encoding or headers !!
+    al->http.clientReplySz.payloadData = out.size - out.headers_sz; // pretend its all un-encoded data for now.
 
     al->cache.highOffset = out.offset;
 
     al->cache.code = logType;
 
-    al->cache.msec = tvSubMsec(start_time, current_time);
+    al->cache.msec = tvSubMsec(al->cache.start_time, current_time);
 
     if (request)
         prepareLogWithRequestDetails(request, al);
@@ -752,7 +722,7 @@ ClientHttpRequest::logRequest()
     if (getConn() != NULL && getConn()->clientConnection != NULL && getConn()->clientConnection->rfc931[0])
         al->cache.rfc931 = getConn()->clientConnection->rfc931;
 
-#if USE_SSL && 0
+#if USE_OPENSSL && 0
 
     /* This is broken. Fails if the connection has been closed. Needs
      * to snarf the ssl details some place earlier..
@@ -762,36 +732,47 @@ ClientHttpRequest::logRequest()
 
 #endif
 
-    /*Add meta headers*/
+    /*Add notes*/
+    // The al->notes and request->notes must point to the same object.
+    (void)SyncNotes(*al, *request);
     typedef Notes::iterator ACAMLI;
     for (ACAMLI i = Config.notes.begin(); i != Config.notes.end(); ++i) {
-        if (const char *value = (*i)->match(request, al->reply)) {
-            al->notes.addEntry(new HttpHeaderEntry(HDR_OTHER, (*i)->key.termedBuf(), value));
+        if (const char *value = (*i)->match(request, al->reply, NULL)) {
+            NotePairs &notes = SyncNotes(*al, *request);
+            notes.add((*i)->key.termedBuf(), value);
             debugs(33, 3, HERE << (*i)->key.termedBuf() << " " << value);
         }
     }
 
-    ACLFilledChecklist *checklist = clientAclChecklistCreate(Config.accessList.log, this);
-
+    ACLFilledChecklist checklist(NULL, request, NULL);
     if (al->reply) {
-        checklist->reply = al->reply;
-        HTTPMSGLOCK(checklist->reply);
+        checklist.reply = al->reply;
+        HTTPMSGLOCK(checklist.reply);
     }
 
-    if (!Config.accessList.log || checklist->fastCheck() == ACCESS_ALLOWED) {
-        if (request) {
-            al->adapted_request = request;
-            HTTPMSGLOCK(al->adapted_request);
+    if (request) {
+        al->adapted_request = request;
+        HTTPMSGLOCK(al->adapted_request);
+    }
+    accessLogLog(al, &checklist);
+
+    bool updatePerformanceCounters = true;
+    if (Config.accessList.stats_collection) {
+        ACLFilledChecklist statsCheck(Config.accessList.stats_collection, request, NULL);
+        if (al->reply) {
+            statsCheck.reply = al->reply;
+            HTTPMSGLOCK(statsCheck.reply);
         }
-        accessLogLog(al, checklist);
+        updatePerformanceCounters = (statsCheck.fastCheck() == ACCESS_ALLOWED);
+    }
+
+    if (updatePerformanceCounters) {
         if (request)
             updateCounters();
 
         if (getConn() != NULL && getConn()->clientConnection != NULL)
             clientdbUpdate(getConn()->clientConnection->remote, logType, AnyP::PROTO_HTTP, out.size);
     }
-
-    delete checklist;
 }
 
 void
@@ -859,6 +840,87 @@ void ConnStateData::connStateClosed(const CommCloseCbParams &io)
     deleteThis("ConnStateData::connStateClosed");
 }
 
+#if USE_AUTH
+void
+ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *by)
+{
+    if (auth_ == NULL) {
+        if (aur != NULL) {
+            debugs(33, 2, "Adding connection-auth to " << clientConnection << " from " << by);
+            auth_ = aur;
+        }
+        return;
+    }
+
+    // clobered with self-pointer
+    // NP: something nasty is going on in Squid, but harmless.
+    if (aur == auth_) {
+        debugs(33, 2, "WARNING: Ignoring duplicate connection-auth for " << clientConnection << " from " << by);
+        return;
+    }
+
+    /*
+     * Connection-auth relies on a single set of credentials being preserved
+     * for all requests on a connection once they have been setup.
+     * There are several things which need to happen to preserve security
+     * when connection-auth credentials change unexpectedly or are unset.
+     *
+     * 1) auth helper released from any active state
+     *
+     * They can only be reserved by a handshake process which this
+     * connection can now never complete.
+     * This prevents helpers hanging when their connections close.
+     *
+     * 2) pinning is expected to be removed and server conn closed
+     *
+     * The upstream link is authenticated with the same credentials.
+     * Expecting the same level of consistency we should have received.
+     * This prevents upstream being faced with multiple or missing
+     * credentials after authentication.
+     * NP: un-pin is left to the cleanup in ConnStateData::swanSong()
+     *     we just trigger that cleanup here via comm_reset_close() or
+     *     ConnStateData::stopReceiving()
+     *
+     * 3) the connection needs to close.
+     *
+     * This prevents attackers injecting requests into a connection,
+     * or gateways wrongly multiplexing users into a single connection.
+     *
+     * When credentials are missing closure needs to follow an auth
+     * challenge for best recovery by the client.
+     *
+     * When credentials change there is nothing we can do but abort as
+     * fast as possible. Sending TCP RST instead of an HTTP response
+     * is the best-case action.
+     */
+
+    // clobbered with nul-pointer
+    if (aur == NULL) {
+        debugs(33, 2, "WARNING: Graceful closure on " << clientConnection << " due to connection-auth erase from " << by);
+        auth_->releaseAuthServer();
+        auth_ = NULL;
+        // XXX: need to test whether the connection re-auth challenge is sent. If not, how to trigger it from here.
+        // NP: the current situation seems to fix challenge loops in Safari without visible issues in others.
+        // we stop receiving more traffic but can leave the Job running to terminate after the error or challenge is delivered.
+        stopReceiving("connection-auth removed");
+        return;
+    }
+
+    // clobbered with alternative credentials
+    if (aur != auth_) {
+        debugs(33, 2, "ERROR: Closing " << clientConnection << " due to change of connection-auth from " << by);
+        auth_->releaseAuthServer();
+        auth_ = NULL;
+        // this is a fatal type of problem.
+        // Close the connection immediately with TCP RST to abort all traffic flow
+        comm_reset_close(clientConnection);
+        return;
+    }
+
+    /* NOT REACHABLE */
+}
+#endif
+
 // cleans up before destructor is called
 void
 ConnStateData::swanSong()
@@ -868,20 +930,16 @@ ConnStateData::swanSong()
     clientdbEstablished(clientConnection->remote, -1); /* decrement */
     assert(areAllContextsForThisConnection());
     freeAllContexts();
-#if USE_AUTH
-    if (auth_user_request != NULL) {
-        debugs(33, 4, "ConnStateData::swanSong: freeing auth_user_request '" << auth_user_request << "' (this is '" << this << "')");
-        auth_user_request->onConnectionClose(this);
-    }
-#endif
 
-    if (Comm::IsConnOpen(pinning.serverConnection))
-        pinning.serverConnection->close();
-    pinning.serverConnection = NULL;
+    unpinConnection(true);
 
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
-    clientConnection = NULL;
+
+#if USE_AUTH
+    // NP: do this bit after closing the connections to avoid side effects from unwanted TCP RST
+    setAuth(NULL, "ConnStateData::SwanSong cleanup");
+#endif
 
     BodyProducer::swanSong();
     flags.swanSang = true;
@@ -913,7 +971,7 @@ ConnStateData::~ConnStateData()
     if (bodyPipe != NULL)
         stopProducingFor(bodyPipe, false);
 
-#if USE_SSL
+#if USE_OPENSSL
     delete sslServerBump;
 #endif
 }
@@ -1374,9 +1432,7 @@ ClientSocketContext::buildRangeHeader(HttpReply * rep)
          * offset data, but we won't be requesting it.
          * So, we can either re-request, or generate an error
          */
-        debugs(33, 3, "clientBuildRangeHeader: will not do ranges: " << range_err << ".");
-        delete http->request->range;
-        http->request->range = NULL;
+        http->request->ignoreRange(range_err);
     } else {
         /* XXX: TODO: Review, this unconditional set may be wrong. */
         rep->sline.set(rep->sline.version, Http::scPartialContent);
@@ -1386,7 +1442,7 @@ ClientSocketContext::buildRangeHeader(HttpReply * rep)
         bool replyMatchRequest = rep->content_range != NULL ?
                                  request->range->contains(rep->content_range->spec) :
                                  true;
-        const int spec_count = http->request->range->specs.count;
+        const int spec_count = http->request->range->specs.size();
         int64_t actual_clen = -1;
 
         debugs(33, 3, "clientBuildRangeHeader: range spec count: " <<
@@ -1602,12 +1658,14 @@ ConnStateData::readNextRequest()
 
     fd_note(clientConnection->fd, "Idle client: Waiting for next request");
     /**
-     * Set the timeout BEFORE calling clientReadRequest().
+     * Set the timeout BEFORE calling readSomeData().
      */
     typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
     AsyncCall::Pointer timeoutCall = JobCallback(33, 5,
                                      TimeoutDialer, this, ConnStateData::requestTimeout);
-    commSetConnTimeout(clientConnection, Config.Timeout.clientIdlePconn, timeoutCall);
+    const int timeout = isFtp ? Config.Timeout.ftpClientIdle :
+                        Config.Timeout.clientIdlePconn;
+    commSetConnTimeout(clientConnection, timeout, timeoutCall);
 
     readSomeData();
     /** Please don't do anything with the FD past here! */
@@ -1746,7 +1804,7 @@ ClientSocketContext::canPackMoreRanges() const
     if (!http->range_iter.debt()) {
         debugs(33, 5, HERE << "At end of current range spec for " << clientConnection);
 
-        if (http->range_iter.pos.incrementable())
+        if (http->range_iter.pos != http->range_iter.end)
             ++http->range_iter.pos;
 
         http->range_iter.updateSpec();
@@ -1763,9 +1821,16 @@ ClientSocketContext::canPackMoreRanges() const
 int64_t
 ClientSocketContext::getNextRangeOffset() const
 {
+    debugs (33, 5, "range: " << http->request->range <<
+            "; http offset " << http->out.offset <<
+            "; reply " << reply);
+
+    // XXX: This method is called from many places, including pullData() which
+    // may be called before prepareReply() [on some Squid-generated errors].
+    // Hence, we may not even know yet whether we should honor/do ranges.
+
     if (http->request->range) {
         /* offset in range specs does not count the prefix of an http msg */
-        debugs (33, 5, "ClientSocketContext::getNextRangeOffset: http offset " << http->out.offset);
         /* check: reply was parsed and range iterator was initialized */
         assert(http->range_iter.valid);
         /* filter out data according to range specs */
@@ -1802,7 +1867,7 @@ ClientSocketContext::getNextRangeOffset() const
 void
 ClientSocketContext::pullData()
 {
-    debugs(33, 5, HERE << clientConnection << " attempting to pull upstream data");
+    debugs(33, 5, reply << " written " << http->out.size << " into " << clientConnection);
 
     /* More data will be coming from the stream. */
     StoreIOBuffer readBuffer;
@@ -1815,6 +1880,9 @@ ClientSocketContext::pullData()
     clientStreamRead(getTail(), http, readBuffer);
 }
 
+/** Adapt stream status to account for Range cases
+ *
+ */
 clientStream_status_t
 ClientSocketContext::socketState()
 {
@@ -1831,11 +1899,8 @@ ClientSocketContext::socketState()
             if (!canPackMoreRanges()) {
                 debugs(33, 5, HERE << "Range request at end of returnable " <<
                        "range sequence on " << clientConnection);
-
-                if (http->request->flags.proxyKeepalive)
-                    return STREAM_COMPLETE;
-                else
-                    return STREAM_UNPLANNED_COMPLETE;
+                // we got everything we wanted from the store
+                return STREAM_COMPLETE;
             }
         } else if (reply && reply->content_range) {
             /* reply has content-range, but Squid is not managing ranges */
@@ -1848,24 +1913,11 @@ ClientSocketContext::socketState()
 
             // did we get at least what we expected, based on range specs?
 
-            if (bytesSent == bytesExpected) { // got everything
-                if (http->request->flags.proxyKeepalive)
-                    return STREAM_COMPLETE;
-                else
-                    return STREAM_UNPLANNED_COMPLETE;
-            }
-
-            // The logic below is not clear: If we got more than we
-            // expected why would persistency matter? Should not this
-            // always be an error?
-            if (bytesSent > bytesExpected) { // got extra
-                if (http->request->flags.proxyKeepalive)
-                    return STREAM_COMPLETE;
-                else
-                    return STREAM_UNPLANNED_COMPLETE;
-            }
+            if (bytesSent == bytesExpected) // got everything
+                return STREAM_COMPLETE;
 
-            // did not get enough yet, expecting more
+            if (bytesSent > bytesExpected) // Error: Sent more than expected
+                return STREAM_UNPLANNED_COMPLETE;
         }
 
         return STREAM_NONE;
@@ -1936,7 +1988,7 @@ ConnStateData::stopSending(const char *error)
     if (!stoppedReceiving()) {
         if (const int64_t expecting = mayNeedToReadMoreBody()) {
             debugs(33, 5, HERE << "must still read " << expecting <<
-                   " request body bytes with " << in.notYetUsed << " unused");
+                   " request body bytes with " << in.buf.length() << " unused");
             return; // wait for the request receiver to finish reading
         }
     }
@@ -1972,8 +2024,11 @@ ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bu
         break;
 
     case STREAM_COMPLETE:
-        debugs(33, 5, HERE << conn << " Keeping Alive");
-        keepaliveNextRequest();
+        debugs(33, 5, conn << "Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
+        if (http->request->flags.proxyKeepalive)
+            keepaliveNextRequest();
+        else
+            initiateClose("STREAM_COMPLETE NOKEEPALIVE");
         return;
 
     case STREAM_UNPLANNED_COMPLETE:
@@ -2000,10 +2055,10 @@ parseHttpRequestAbort(ConnStateData * csd, const char *uri)
     ClientSocketContext *context;
     StoreIOBuffer tempBuffer;
     http = new ClientHttpRequest(csd);
-    http->req_sz = csd->in.notYetUsed;
+    http->req_sz = csd->in.buf.length();
     http->uri = xstrdup(uri);
     setLogUri (http, uri);
-    context = ClientSocketContextNew(csd->clientConnection, http);
+    context = new ClientSocketContext(csd->clientConnection, http);
     tempBuffer.data = context->reqbuf;
     tempBuffer.length = HTTP_REQBUF_SZ;
     clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach,
@@ -2140,7 +2195,7 @@ prepareAcceleratedURL(ConnStateData * conn, ClientHttpRequest *http, char *url,
     }
 
     if (vport < 0)
-        vport = http->getConn()->clientConnection->local.GetPort();
+        vport = http->getConn()->clientConnection->local.port();
 
     const bool switchedToHttps = conn->switchedToHttps();
     const bool tryHostHeader = vhost || switchedToHttps;
@@ -2163,7 +2218,7 @@ prepareAcceleratedURL(ConnStateData * conn, ClientHttpRequest *http, char *url,
                      strlen(host);
         http->uri = (char *)xcalloc(url_sz, 1);
         const char *protocol = switchedToHttps ?
-                               "https" : conn->port->protocol;
+                               "https" : AnyP::UriScheme(conn->port->transport.protocol).c_str();
         snprintf(http->uri, url_sz, "%s://%s%s", protocol, host, url);
         debugs(33, 5, "ACCEL VHOST REWRITE: '" << http->uri << "'");
     } else if (conn->port->defaultsite /* && !vhost */) {
@@ -2177,16 +2232,16 @@ prepareAcceleratedURL(ConnStateData * conn, ClientHttpRequest *http, char *url,
             snprintf(vportStr, sizeof(vportStr),":%d",vport);
         }
         snprintf(http->uri, url_sz, "%s://%s%s%s",
-                 conn->port->protocol, conn->port->defaultsite, vportStr, url);
+                 AnyP::UriScheme(conn->port->transport.protocol).c_str(), conn->port->defaultsite, vportStr, url);
         debugs(33, 5, "ACCEL DEFAULTSITE REWRITE: '" << http->uri <<"'");
     } else if (vport > 0 /* && (!vhost || no Host:) */) {
         debugs(33, 5, "ACCEL VPORT REWRITE: http_port IP + vport=" << vport);
         /* Put the local socket IP address as the hostname, with whatever vport we found  */
         int url_sz = strlen(url) + 32 + Config.appendDomainLen;
         http->uri = (char *)xcalloc(url_sz, 1);
-        http->getConn()->clientConnection->local.ToHostname(ipbuf,MAX_IPSTRLEN);
+        http->getConn()->clientConnection->local.toHostStr(ipbuf,MAX_IPSTRLEN);
         snprintf(http->uri, url_sz, "%s://%s:%d%s",
-                 http->getConn()->port->protocol,
+                 AnyP::UriScheme(conn->port->transport.protocol).c_str(),
                  ipbuf, vport, url);
         debugs(33, 5, "ACCEL VPORT REWRITE: '" << http->uri << "'");
     }
@@ -2207,16 +2262,16 @@ prepareTransparentURL(ConnStateData * conn, ClientHttpRequest *http, char *url,
         int url_sz = strlen(url) + 32 + Config.appendDomainLen +
                      strlen(host);
         http->uri = (char *)xcalloc(url_sz, 1);
-        snprintf(http->uri, url_sz, "%s://%s%s", conn->port->protocol, host, url);
+        snprintf(http->uri, url_sz, "%s://%s%s", AnyP::UriScheme(conn->port->transport.protocol).c_str(), host, url);
         debugs(33, 5, "TRANSPARENT HOST REWRITE: '" << http->uri <<"'");
     } else {
         /* Put the local socket IP address as the hostname.  */
         int url_sz = strlen(url) + 32 + Config.appendDomainLen;
         http->uri = (char *)xcalloc(url_sz, 1);
-        http->getConn()->clientConnection->local.ToHostname(ipbuf,MAX_IPSTRLEN);
+        http->getConn()->clientConnection->local.toHostStr(ipbuf,MAX_IPSTRLEN);
         snprintf(http->uri, url_sz, "%s://%s:%d%s",
-                 http->getConn()->port->protocol,
-                 ipbuf, http->getConn()->clientConnection->local.GetPort(), url);
+                 AnyP::UriScheme(http->getConn()->port->transport.protocol).c_str(),
+                 ipbuf, http->getConn()->clientConnection->local.port(), url);
         debugs(33, 5, "TRANSPARENT REWRITE: '" << http->uri << "'");
     }
 }
@@ -2309,7 +2364,7 @@ parseHttpRequest(ConnStateData *csd, HttpParser *hp, HttpRequestMethod * method_
 
     /* deny CONNECT via accelerated ports */
     if (*method_p == Http::METHOD_CONNECT && csd->port && csd->port->flags.accelSurrogate) {
-        debugs(33, DBG_IMPORTANT, "WARNING: CONNECT method received on " << csd->port->protocol << " Accelerator port " << csd->port->s.GetPort() );
+        debugs(33, DBG_IMPORTANT, "WARNING: CONNECT method received on " << csd->port->transport.protocol << " Accelerator port " << csd->port->s.port());
         /* XXX need a way to say "this many character length string" */
         debugs(33, DBG_IMPORTANT, "WARNING: for request: " << hp->buf);
         hp->request_parse_status = Http::scMethodNotAllowed;
@@ -2344,7 +2399,7 @@ parseHttpRequest(ConnStateData *csd, HttpParser *hp, HttpRequestMethod * method_
     http = new ClientHttpRequest(csd);
 
     http->req_sz = HttpParserRequestLen(hp);
-    result = ClientSocketContextNew(csd->clientConnection, http);
+    result = new ClientSocketContext(csd->clientConnection, http);
     tempBuffer.data = result->reqbuf;
     tempBuffer.length = HTTP_REQBUF_SZ;
 
@@ -2423,32 +2478,20 @@ parseHttpRequest(ConnStateData *csd, HttpParser *hp, HttpRequestMethod * method_
     return result;
 }
 
-int
-ConnStateData::getAvailableBufferLength() const
-{
-    assert (in.allocatedSize > in.notYetUsed); // allocated more than used
-    const size_t result = in.allocatedSize - in.notYetUsed - 1;
-    // huge request_header_max_size may lead to more than INT_MAX unused space
-    assert (static_cast<ssize_t>(result) <= INT_MAX);
-    return result;
-}
-
 bool
-ConnStateData::maybeMakeSpaceAvailable()
+ConnStateData::In::maybeMakeSpaceAvailable()
 {
-    if (getAvailableBufferLength() < 2) {
-        size_t newSize;
-        if (in.allocatedSize >= Config.maxRequestBufferSize) {
+    if (buf.spaceSize() < 2) {
+        const SBuf::size_type haveCapacity = buf.length() + buf.spaceSize();
+        if (haveCapacity >= Config.maxRequestBufferSize) {
             debugs(33, 4, "request buffer full: client_request_buffer_max_size=" << Config.maxRequestBufferSize);
             return false;
         }
-        if ((newSize=in.allocatedSize * 2) > Config.maxRequestBufferSize) {
-            newSize=Config.maxRequestBufferSize;
-        }
-        in.buf = (char *)memReallocBuf(in.buf, newSize, &in.allocatedSize);
-        debugs(33, 2, "growing request buffer: notYetUsed=" << in.notYetUsed << " size=" << in.allocatedSize);
+        const SBuf::size_type wantCapacity = min(static_cast<SBuf::size_type>(Config.maxRequestBufferSize), haveCapacity*2);
+        buf.reserveCapacity(wantCapacity);
+        debugs(33, 2, "growing request buffer: available=" << buf.spaceSize() << " used=" << buf.length());
     }
-    return true;
+    return (buf.spaceSize() >= 2);
 }
 
 void
@@ -2486,7 +2529,7 @@ ConnStateData::connReadWasError(comm_err_t flag, int size, int xerrno)
         if (!ignoreErrno(xerrno)) {
             debugs(33, 2, "connReadWasError: FD " << clientConnection << ": " << xstrerr(xerrno));
             return 1;
-        } else if (in.notYetUsed == 0) {
+        } else if (in.buf.isEmpty()) {
             debugs(33, 2, "connReadWasError: FD " << clientConnection << ": no data to process (" << xstrerr(xerrno) << ")");
         }
     }
@@ -2498,7 +2541,7 @@ int
 ConnStateData::connFinishedWithConn(int size)
 {
     if (size == 0) {
-        if (getConcurrentRequestCount() == 0 && in.notYetUsed == 0) {
+        if (getConcurrentRequestCount() == 0 && in.buf.isEmpty()) {
             /* no current or pending requests */
             debugs(33, 4, HERE << clientConnection << " closed");
             return 1;
@@ -2516,26 +2559,19 @@ ConnStateData::connFinishedWithConn(int size)
 void
 connNoteUseOfBuffer(ConnStateData* conn, size_t byteCount)
 {
-    assert(byteCount > 0 && byteCount <= conn->in.notYetUsed);
-    conn->in.notYetUsed -= byteCount;
-    debugs(33, 5, HERE << "conn->in.notYetUsed = " << conn->in.notYetUsed);
-    /*
-     * If there is still data that will be used,
-     * move it to the beginning.
-     */
-
-    if (conn->in.notYetUsed > 0)
-        memmove(conn->in.buf, conn->in.buf + byteCount, conn->in.notYetUsed);
+    assert(byteCount > 0 && byteCount <= conn->in.buf.length());
+    conn->in.buf.consume(byteCount);
+    debugs(33, 5, "conn->in.buf has " << conn->in.buf.length() << " bytes unused.");
 }
 
 /// respond with ERR_TOO_BIG if request header exceeds request_header_max_size
 void
 ConnStateData::checkHeaderLimits()
 {
-    if (in.notYetUsed < Config.maxRequestHeaderSize)
+    if (in.buf.length() < Config.maxRequestHeaderSize)
         return; // can accumulte more header data
 
-    debugs(33, 3, "Request header is too large (" << in.notYetUsed << " > " <<
+    debugs(33, 3, "Request header is too large (" << in.buf.length() << " > " <<
            Config.maxRequestHeaderSize << " bytes)");
 
     ClientSocketContext *context = parseHttpRequestAbort(this, "error:request-too-large");
@@ -2575,7 +2611,7 @@ ConnStateData::quitAfterError(HttpRequest *request)
     debugs(33,4, HERE << "Will close after error: " << clientConnection);
 }
 
-#if USE_SSL
+#if USE_OPENSSL
 bool ConnStateData::serveDelayedError(ClientSocketContext *context)
 {
     ClientHttpRequest *http = context->http;
@@ -2594,7 +2630,7 @@ bool ConnStateData::serveDelayedError(ClientSocketContext *context)
         clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
         assert(repContext);
         debugs(33, 5, "Responding with delated error for " << http->uri);
-        repContext->setReplyToStoreEntry(sslServerBump->entry);
+        repContext->setReplyToStoreEntry(sslServerBump->entry, "delayed SslBump error");
 
         // save the original request for logging purposes
         if (!context->http->al->request) {
@@ -2611,18 +2647,20 @@ bool ConnStateData::serveDelayedError(ClientSocketContext *context)
     // In bump-server-first mode, we have not necessarily seen the intended
     // server name at certificate-peeking time. Check for domain mismatch now,
     // when we can extract the intended name from the bumped HTTP request.
-    if (sslServerBump->serverCert.get()) {
+    if (X509 *srvCert = sslServerBump->serverCert.get()) {
         HttpRequest *request = http->request;
-        if (!Ssl::checkX509ServerValidity(sslServerBump->serverCert.get(), request->GetHost())) {
+        if (!Ssl::checkX509ServerValidity(srvCert, request->GetHost())) {
             debugs(33, 2, "SQUID_X509_V_ERR_DOMAIN_MISMATCH: Certificate " <<
                    "does not match domainname " << request->GetHost());
 
-            ACLFilledChecklist check(Config.ssl_client.cert_error, request, dash_str);
-            check.sslErrors = new Ssl::Errors(SQUID_X509_V_ERR_DOMAIN_MISMATCH);
-            const bool allowDomainMismatch =
-                check.fastCheck() == ACCESS_ALLOWED;
-            delete check.sslErrors;
-            check.sslErrors = NULL;
+            bool allowDomainMismatch = false;
+            if (Config.ssl_client.cert_error) {
+                ACLFilledChecklist check(Config.ssl_client.cert_error, request, dash_str);
+                check.sslErrors = new Ssl::CertErrors(Ssl::CertError(SQUID_X509_V_ERR_DOMAIN_MISMATCH, srvCert));
+                allowDomainMismatch = (check.fastCheck() == ACCESS_ALLOWED);
+                delete check.sslErrors;
+                check.sslErrors = NULL;
+            }
 
             if (!allowDomainMismatch) {
                 quitAfterError(request);
@@ -2640,7 +2678,7 @@ bool ConnStateData::serveDelayedError(ClientSocketContext *context)
                 err->src_addr = clientConnection->remote;
                 Ssl::ErrorDetail *errDetail = new Ssl::ErrorDetail(
                     SQUID_X509_V_ERR_DOMAIN_MISMATCH,
-                    sslServerBump->serverCert.get(), NULL);
+                    srvCert, NULL);
                 err->detail = errDetail;
                 // Save the original request for logging purposes.
                 if (!context->http->al->request) {
@@ -2657,7 +2695,7 @@ bool ConnStateData::serveDelayedError(ClientSocketContext *context)
 
     return false;
 }
-#endif // USE_SSL
+#endif // USE_OPENSSL
 
 static void
 clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *context, const HttpRequestMethod& method, Http::ProtocolVersion http_ver)
@@ -2685,15 +2723,15 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
         assert (repContext);
         switch (hp->request_parse_status) {
         case Http::scHeaderTooLarge:
-            repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf, NULL);
+            repContext->setReplyToError(ERR_TOO_BIG, Http::scBadRequest, method, http->uri, conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL);
             break;
         case Http::scMethodNotAllowed:
             repContext->setReplyToError(ERR_UNSUP_REQ, Http::scMethodNotAllowed, method, http->uri,
-                                        conn->clientConnection->remote, NULL, conn->in.buf, NULL);
+                                        conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL);
             break;
         default:
             repContext->setReplyToError(ERR_INVALID_REQ, hp->request_parse_status, method, http->uri,
-                                        conn->clientConnection->remote, NULL, conn->in.buf, NULL);
+                                        conn->clientConnection->remote, NULL, conn->in.buf.c_str(), NULL);
         }
         assert(context->http->out.offset == 0);
         context->pullData();
@@ -2719,10 +2757,9 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
         goto finish;
     }
 
-    /* RFC 2616 section 10.5.6 : handle unsupported HTTP versions cleanly. */
-    /* We currently only accept 0.9, 1.0, 1.1 */
+    /* RFC 2616 section 10.5.6 : handle unsupported HTTP major versions cleanly. */
+    /* We currently only support 0.9, 1.0, 1.1 properly */
     if ( (http_ver.major == 0 && http_ver.minor != 9) ||
-            (http_ver.major == 1 && http_ver.minor > 1 ) ||
             (http_ver.major > 1) ) {
 
         clientStreamNode *node = context->getClientReplyContext();
@@ -2767,8 +2804,8 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
                               !conn->port->allow_direct : 0;
 #if USE_AUTH
     if (request->flags.sslBumped) {
-        if (conn->auth_user_request != NULL)
-            request->auth_user_request = conn->auth_user_request;
+        if (conn->getAuth() != NULL)
+            request->auth_user_request = conn->getAuth();
     }
 #endif
 
@@ -2778,7 +2815,16 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
      */
     if (http->clientConnection != NULL) {
         request->flags.intercepted = ((http->clientConnection->flags & COMM_INTERCEPTION) != 0);
-        request->flags.spoofClientIp = ((http->clientConnection->flags & COMM_TRANSPARENT) != 0 ) ;
+        request->flags.interceptTproxy = ((http->clientConnection->flags & COMM_TRANSPARENT) != 0 ) ;
+        if (request->flags.interceptTproxy) {
+            if (Config.accessList.spoof_client_ip) {
+                ACLFilledChecklist *checklist = clientAclChecklistCreate(Config.accessList.spoof_client_ip, http);
+                request->flags.spoofClientIp = (checklist->fastCheck() == ACCESS_ALLOWED);
+                delete checklist;
+            } else
+                request->flags.spoofClientIp = true;
+        } else
+            request->flags.spoofClientIp = false;
     }
 
     if (internalCheck(request->urlpath.termedBuf())) {
@@ -2876,7 +2922,7 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
         conn->flags.readMore = false;
     }
 
-#if USE_SSL
+#if USE_OPENSSL
     if (conn->switchedToHttps() && conn->serveDelayedError(context))
         goto finish;
 #endif
@@ -2901,7 +2947,7 @@ clientProcessRequest(ConnStateData *conn, HttpParser *hp, ClientSocketContext *c
             assert (repContext);
             conn->quitAfterError(request.getRaw());
             repContext->setReplyToError(ERR_TOO_BIG,
-                                        Http::scRequestEntityTooLarge, Http::METHOD_NONE, NULL,
+                                        Http::scPayloadTooLarge, Http::METHOD_NONE, NULL,
                                         conn->clientConnection->remote, http->request, NULL, NULL);
             assert(context->http->out.offset == 0);
             context->pullData();
@@ -2951,45 +2997,70 @@ ConnStateData::processFtpRequest(ClientSocketContext *const context)
     assert(http != NULL);
     HttpRequest *const request = http->request;
     assert(request != NULL);
+    debugs(33, 9, request);
+
     HttpHeader &header = request->header;
     assert(header.has(HDR_FTP_COMMAND));
     String &cmd = header.findEntry(HDR_FTP_COMMAND)->value;
     assert(header.has(HDR_FTP_ARGUMENTS));
     String &params = header.findEntry(HDR_FTP_ARGUMENTS)->value;
 
-    if (!FtpHandleRequest(context, cmd, params)) {
+    const bool fwd = !http->storeEntry() &&
+                     FtpHandleRequest(context, cmd, params);
+
+    if (http->storeEntry() != NULL) {
+        debugs(33, 4, "got an immediate response");
         assert(http->storeEntry() != NULL);
         clientSetKeepaliveFlag(http);
         context->pullData();
-        return;
+    } else if (fwd) {
+        debugs(33, 4, "forwarding request to server side");
+        assert(http->storeEntry() == NULL);
+        clientProcessRequest(this, &parser_, context, request->method,
+                             request->http_ver);
+    } else {
+        debugs(33, 4, "will resume processing later");
     }
+}
 
-    assert(http->storeEntry() == NULL);
-    clientProcessRequest(this, &parser_, context, request->method,
-                         request->http_ver);
+void
+ConnStateData::resumeFtpRequest(ClientSocketContext *const context)
+{
+    debugs(33, 4, "resuming");
+    processFtpRequest(context);
 }
 
 static void
 connStripBufferWhitespace (ConnStateData * conn)
 {
-    while (conn->in.notYetUsed > 0 && xisspace(conn->in.buf[0])) {
-        memmove(conn->in.buf, conn->in.buf + 1, conn->in.notYetUsed - 1);
-        -- conn->in.notYetUsed;
+    // XXX: kill this whole function.
+    while (!conn->in.buf.isEmpty() && xisspace(conn->in.buf.at(0))) {
+        conn->in.buf.consume(1);
     }
 }
 
-static int
-connOkToAddRequest(ConnStateData * conn)
+/**
+ * Limit the number of concurrent requests.
+ * \return true  when there are available position(s) in the pipeline queue for another request.
+ * \return false when the pipeline queue is full or disabled.
+ */
+bool
+ConnStateData::concurrentRequestQueueFilled() const
 {
-    const int limit = !conn->isFtp && Config.onoff.pipeline_prefetch ? 2 : 1;
-    const int result = conn->getConcurrentRequestCount() < limit;
+    const int existingRequestCount = getConcurrentRequestCount();
+
+    // default to the configured pipeline size.
+    // add 1 because the head of pipeline is counted in concurrent requests and not prefetch queue
+    const int concurrentRequestLimit = (isFtp ? 0 : Config.pipeline_max_prefetch) + 1;
 
-    if (!result) {
-        debugs(33, 3, HERE << conn->clientConnection << " max concurrent requests reached");
-        debugs(33, 5, HERE << conn->clientConnection << " defering new request until one is done");
+    // when queue filled already we cant add more.
+    if (existingRequestCount >= concurrentRequestLimit) {
+        debugs(33, 3, clientConnection << " max concurrent requests reached (" << concurrentRequestLimit << ")");
+        debugs(33, 5, clientConnection << " deferring new request until one is done");
+        return true;
     }
 
-    return result;
+    return false;
 }
 
 /**
@@ -3007,28 +3078,23 @@ ConnStateData::clientParseRequests()
 
     // Loop while we have read bytes that are not needed for producing the body
     // On errors, bodyPipe may become nil, but readMore will be cleared
-    while (in.notYetUsed > 0 && !bodyPipe && flags.readMore) {
+    while (!in.buf.isEmpty() && !bodyPipe && flags.readMore) {
         connStripBufferWhitespace(this);
 
         /* Don't try to parse if the buffer is empty */
-        if (in.notYetUsed == 0)
+        if (in.buf.isEmpty())
             break;
 
-        /* Limit the number of concurrent requests to 2 */
-        if (!connOkToAddRequest(this)) {
+        /* Limit the number of concurrent requests */
+        if (concurrentRequestQueueFilled())
             break;
-        }
 
-        /* Should not be needed anymore */
-        /* Terminate the string */
-        in.buf[in.notYetUsed] = '\0';
-
-        Http::ProtocolVersion http_ver;
         ClientSocketContext *context = NULL;
+        Http::ProtocolVersion http_ver;
         if (!isFtp) {
             /* Begin the parsing */
             PROF_start(parseHttpRequest);
-            HttpParserInit(&parser_, in.buf, in.notYetUsed);
+            HttpParserInit(&parser_, in.buf.c_str(), in.buf.length());
 
             /* Process request */
             context = parseHttpRequest(this, &parser_, &method, &http_ver);
@@ -3108,7 +3174,7 @@ ConnStateData::clientReadRequest(const CommIoCbParams &io)
             kb_incr(&(statCounter.client_http.kbytes_in), io.size);
 
             // may comm_close or setReplyToError
-            if (!handleReadData(io.buf, io.size))
+            if (!handleReadData(io.buf2))
                 return;
 
         } else if (io.size == 0) {
@@ -3222,16 +3288,9 @@ ConnStateData::handleFtpRequestData()
  * \retval true  we did not call comm_close or setReplyToError
  */
 bool
-ConnStateData::handleReadData(char *buf, size_t size)
+ConnStateData::handleReadData(SBuf *buf)
 {
-    char *current_buf = in.addressToReadInto();
-
-    if (buf != current_buf)
-        memmove(current_buf, buf, size);
-
-    in.notYetUsed += size;
-
-    in.buf[in.notYetUsed] = '\0'; /* Terminate the string */
+    assert(buf == &in.buf); // XXX: make this abort the transaction if this fails
 
     // if we are reading a body, stuff data into the body pipe
     if (bodyPipe != NULL)
@@ -3242,7 +3301,7 @@ ConnStateData::handleReadData(char *buf, size_t size)
 /**
  * called when new request body data has been buffered in in.buf
  * may close the connection if we were closing and piped everything out
-e *
+ *
  * \retval false called comm_close or setReplyToError (the caller should bail)
  * \retval true  we did not call comm_close or setReplyToError
  */
@@ -3260,7 +3319,7 @@ ConnStateData::handleRequestBodyData()
         }
     } else { // identity encoding
         debugs(33,5, HERE << "handling plain request body for " << clientConnection);
-        putSize = bodyPipe->putMoreData(in.buf, in.notYetUsed);
+        putSize = bodyPipe->putMoreData(in.buf.c_str(), in.buf.length());
         if (!bodyPipe->mayNeedMoreData()) {
             // BodyPipe will clear us automagically when we produced everything
             bodyPipe = NULL;
@@ -3290,17 +3349,17 @@ ConnStateData::handleRequestBodyData()
 err_type
 ConnStateData::handleChunkedRequestBody(size_t &putSize)
 {
-    debugs(33,7, HERE << "chunked from " << clientConnection << ": " << in.notYetUsed);
+    debugs(33, 7, "chunked from " << clientConnection << ": " << in.buf.length());
 
     try { // the parser will throw on errors
 
-        if (!in.notYetUsed) // nothing to do (MemBuf::init requires this check)
+        if (in.buf.isEmpty()) // nothing to do
             return ERR_NONE;
 
         MemBuf raw; // ChunkedCodingParser only works with MemBufs
         // add one because MemBuf will assert if it cannot 0-terminate
-        raw.init(in.notYetUsed, in.notYetUsed+1);
-        raw.append(in.buf, in.notYetUsed);
+        raw.init(in.buf.length(), in.buf.length()+1);
+        raw.append(in.buf.c_str(), in.buf.length());
 
         const mb_size_t wasContentSize = raw.contentSize();
         BodyPipeCheckout bpc(*bodyPipe);
@@ -3348,7 +3407,7 @@ ConnStateData::abortChunkedRequestBody(const err_type error)
         clientReplyContext *repContext = dynamic_cast<clientReplyContext*>(node->data.getRaw());
         assert(repContext);
         const Http::StatusCode scode = (error == ERR_TOO_BIG) ?
-                                       Http::scRequestEntityTooLarge : HTTP_BAD_REQUEST;
+                                       Http::scPayloadTooLarge : HTTP_BAD_REQUEST;
         repContext->setReplyToError(error, scode,
                                     repContext->http->request->method,
                                     repContext->http->uri,
@@ -3404,71 +3463,16 @@ ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer )
 void
 ConnStateData::requestTimeout(const CommTimeoutCbParams &io)
 {
-#if THIS_CONFUSES_PERSISTENT_CONNECTION_AWARE_BROWSERS_AND_USERS
-    debugs(33, 3, "requestTimeout: FD " << io.fd << ": lifetime is expired.");
-
-    if (COMMIO_FD_WRITECB(io.fd)->active) {
-        /* FIXME: If this code is reinstated, check the conn counters,
-         * not the fd table state
-         */
-        /*
-         * Some data has been sent to the client, just close the FD
-         */
-        clientConnection->close();
-    } else if (nrequests) {
-        /*
-         * assume its a persistent connection; just close it
-         */
-        clientConnection->close();
-    } else {
-        /*
-         * Generate an error
-         */
-        ClientHttpRequest **H;
-        clientStreamNode *node;
-        ClientHttpRequest *http = parseHttpRequestAbort(this, "error:Connection%20lifetime%20expired");
-        node = http->client_stream.tail->prev->data;
-        clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
-        assert (repContext);
-        repContext->setReplyToError(ERR_LIFETIME_EXP,
-                                    Http::scRequestTimeout, Http::METHOD_NONE, "N/A", &CachePeer.sin_addr,
-                                    NULL, NULL, NULL);
-        /* No requests can be outstanded */
-        assert(chr == NULL);
-        /* add to the client request queue */
-
-        for (H = &chr; *H; H = &(*H)->next);
-        *H = http;
-
-        clientStreamRead(http->client_stream.tail->data, http, 0,
-                         HTTP_REQBUF_SZ, context->reqbuf);
-
-        /*
-         * if we don't close() here, we still need a timeout handler!
-         */
-        typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
-        AsyncCall::Pointer timeoutCall =  JobCallback(33, 5,
-                                          TimeoutDialer, this, ConnStateData::requestTimeout);
-        commSetConnTimeout(io.conn, 30, timeoutCall);
-
-        /*
-         * Aha, but we don't want a read handler!
-         */
-        Comm::SetSelect(io.fd, COMM_SELECT_READ, NULL, NULL, 0);
-    }
-
-#else
     /*
     * Just close the connection to not confuse browsers
-    * using persistent connections. Some browsers opens
-    * an connection and then does not use it until much
+    * using persistent connections. Some browsers open
+    * a connection and then do not use it until much
     * later (presumeably because the request triggering
     * the open has already been completed on another
     * connection)
     */
     debugs(33, 3, "requestTimeout: FD " << io.fd << ": lifetime is expired.");
     io.conn->close();
-#endif
 }
 
 static void
@@ -3482,23 +3486,38 @@ clientLifetimeTimeout(const CommTimeoutCbParams &io)
         io.conn->close();
 }
 
-ConnStateData *
-connStateCreate(const Comm::ConnectionPointer &client, AnyP::PortCfg *port)
+ConnStateData::ConnStateData(const MasterXaction::Pointer &xact):
+        AsyncJob("ConnStateData"),
+        isFtp(xact->squidPort->transport.protocol == AnyP::PROTO_FTP), // TODO: convert into a method?
+#if USE_OPENSSL
+        sslBumpMode(Ssl::bumpEnd),
+        switchedToHttps_(false),
+        sslServerBump(NULL),
+#endif
+        stoppedSending_(NULL),
+        stoppedReceiving_(NULL)
 {
-    ConnStateData *result = new ConnStateData(port->protocol);
+    pinning.host = NULL;
+    pinning.port = -1;
+    pinning.pinned = false;
+    pinning.auth = false;
+    pinning.zeroReply = false;
+    pinning.peer = NULL;
+
+    // store the details required for creating more MasterXaction objects as new requests come in
+    clientConnection = xact->tcpClient;
+    port = cbdataReference(xact->squidPort.get());
+    log_addr = xact->tcpClient->remote;
+    log_addr.applyMask(Config.Addrs.client_netmask);
 
-    result->clientConnection = client;
-    result->log_addr = client->remote;
-    result->log_addr.ApplyMask(Config.Addrs.client_netmask);
-    result->in.buf = (char *)memAllocBuf(CLIENT_REQ_BUF_SZ, &result->in.allocatedSize);
-    result->port = cbdataReference(port);
+    in.buf.reserveCapacity(CLIENT_REQ_BUF_SZ);
 
     if (port->disable_pmtu_discovery != DISABLE_PMTU_OFF &&
-            (result->transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) {
+            (transparent() || port->disable_pmtu_discovery == DISABLE_PMTU_ALWAYS)) {
 #if defined(IP_MTU_DISCOVER) && defined(IP_PMTUDISC_DONT)
         int i = IP_PMTUDISC_DONT;
-        if (setsockopt(client->fd, SOL_IP, IP_MTU_DISCOVER, &i, sizeof(i)) < 0)
-            debugs(33, 2, "WARNING: Path MTU discovery disabling failed on " << client << " : " << xstrerror());
+        if (setsockopt(clientConnection->fd, SOL_IP, IP_MTU_DISCOVER, &i, sizeof(i)) < 0)
+            debugs(33, 2, "WARNING: Path MTU discovery disabling failed on " << clientConnection << " : " << xstrerror());
 #else
         static bool reported = false;
 
@@ -3510,45 +3529,46 @@ connStateCreate(const Comm::ConnectionPointer &client, AnyP::PortCfg *port)
     }
 
     typedef CommCbMemFunT<ConnStateData, CommCloseCbParams> Dialer;
-    AsyncCall::Pointer call = JobCallback(33, 5, Dialer, result, ConnStateData::connStateClosed);
-    comm_add_close_handler(client->fd, call);
+    AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, ConnStateData::connStateClosed);
+    comm_add_close_handler(clientConnection->fd, call);
 
     if (Config.onoff.log_fqdn)
-        fqdncache_gethostbyaddr(client->remote, FQDN_LOOKUP_IF_MISS);
+        fqdncache_gethostbyaddr(clientConnection->remote, FQDN_LOOKUP_IF_MISS);
 
 #if USE_IDENT
     if (Ident::TheConfig.identLookup) {
         ACLFilledChecklist identChecklist(Ident::TheConfig.identLookup, NULL, NULL);
-        identChecklist.src_addr = client->remote;
-        identChecklist.my_addr = client->local;
+        identChecklist.src_addr = xact->tcpClient->remote;
+        identChecklist.my_addr = xact->tcpClient->local;
         if (identChecklist.fastCheck() == ACCESS_ALLOWED)
-            Ident::Start(client, clientIdentDone, result);
-    }
-#endif
-
-#if USE_SQUID_EUI
-    if (Eui::TheConfig.euiLookup) {
-        if (client->remote.IsIPv4()) {
-            result->clientConnection->remoteEui48.lookup(client->remote);
-        } else if (client->remote.IsIPv6()) {
-            result->clientConnection->remoteEui64.lookup(client->remote);
-        }
+            Ident::Start(xact->tcpClient, clientIdentDone, this);
     }
 #endif
 
-    clientdbEstablished(client->remote, 1);
+    clientdbEstablished(clientConnection->remote, 1);
 
-    if (!result->isFtp)
-        result->flags.readMore = true;
+    flags.readMore = !isFtp;
 
-    return result;
+    if (isFtp) {
+        ftp.gotEpsvAll = false;
+        ftp.readGreeting = false;
+        ftp.state = FTP_BEGIN;
+        ftp.uploadAvailSize = 0;
+    }
 }
 
 /** Handle a new connection on HTTP socket. */
 void
 httpAccept(const CommAcceptCbParams &params)
 {
-    AnyP::PortCfg *s = static_cast<AnyP::PortCfg *>(params.data);
+    MasterXaction::Pointer xact = params.xaction;
+    AnyP::PortCfgPointer s = xact->squidPort;
+
+    if (!s.valid()) {
+        // it is possible the call or accept() was still queued when the port was reconfigured
+        debugs(33, 2, "HTTP accept failure: port reconfigured.");
+        return;
+    }
 
     if (params.flag != COMM_OK) {
         // Its possible the call was still queued when the client disconnected
@@ -3566,7 +3586,7 @@ httpAccept(const CommAcceptCbParams &params)
     ++ incoming_sockets_accepted;
 
     // Socket is ready, setup the connection manager to start using it
-    ConnStateData *connState = connStateCreate(params.conn, s);
+    ConnStateData *connState = new ConnStateData(xact);
 
     typedef CommCbMemFunT<ConnStateData, CommTimeoutCbParams> TimeoutDialer;
     AsyncCall::Pointer timeoutCall =  JobCallback(33, 5,
@@ -3620,7 +3640,7 @@ httpAccept(const CommAcceptCbParams &params)
 #endif
 }
 
-#if USE_SSL
+#if USE_OPENSSL
 
 /** Create SSL connection structure and update fd_table */
 static SSL *
@@ -3788,25 +3808,34 @@ httpsEstablish(ConnStateData *connState,  SSL_CTX *sslContext, Ssl::BumpMode bum
     else {
         char buf[MAX_IPSTRLEN];
         assert(bumpMode != Ssl::bumpNone && bumpMode != Ssl::bumpEnd);
-        HttpRequest *fakeRequest = new HttpRequest;
-        fakeRequest->SetHost(details->local.NtoA(buf, sizeof(buf)));
-        fakeRequest->port = details->local.GetPort();
+        HttpRequest::Pointer fakeRequest(new HttpRequest);
+        fakeRequest->SetHost(details->local.toStr(buf, sizeof(buf)));
+        fakeRequest->port = details->local.port();
         fakeRequest->clientConnectionManager = connState;
         fakeRequest->client_addr = connState->clientConnection->remote;
 #if FOLLOW_X_FORWARDED_FOR
         fakeRequest->indirect_client_addr = connState->clientConnection->remote;
 #endif
         fakeRequest->my_addr = connState->clientConnection->local;
-        fakeRequest->flags.spoofClientIp = ((connState->clientConnection->flags & COMM_TRANSPARENT) != 0 ) ;
+        fakeRequest->flags.interceptTproxy = ((connState->clientConnection->flags & COMM_TRANSPARENT) != 0 ) ;
         fakeRequest->flags.intercepted = ((connState->clientConnection->flags & COMM_INTERCEPTION) != 0);
+        fakeRequest->myportname = connState->port->name;
+        if (fakeRequest->flags.interceptTproxy) {
+            if (Config.accessList.spoof_client_ip) {
+                ACLFilledChecklist checklist(Config.accessList.spoof_client_ip, fakeRequest.getRaw(), NULL);
+                fakeRequest->flags.spoofClientIp = (checklist.fastCheck() == ACCESS_ALLOWED);
+            } else
+                fakeRequest->flags.spoofClientIp = true;
+        } else
+            fakeRequest->flags.spoofClientIp = false;
         debugs(33, 4, HERE << details << " try to generate a Dynamic SSL CTX");
-        connState->switchToHttps(fakeRequest, bumpMode);
+        connState->switchToHttps(fakeRequest.getRaw(), bumpMode);
     }
 }
 
 /**
  * A callback function to use with the ACLFilledChecklist callback.
- * In the case of ACCES_ALLOWED answer initializes a bumped SSL connection,
+ * In the case of ACCESS_ALLOWED answer initializes a bumped SSL connection,
  * else reverts the connection to tunnel mode.
  */
 static void
@@ -3830,10 +3859,10 @@ httpsSslBumpAccessCheckDone(allow_t answer, void *data)
 
         // fake a CONNECT request to force connState to tunnel
         static char ip[MAX_IPSTRLEN];
-        static char reqStr[MAX_IPSTRLEN + 80];
-        connState->clientConnection->local.ToURL(ip, sizeof(ip));
-        snprintf(reqStr, sizeof(reqStr), "CONNECT %s HTTP/1.1\r\nHost: %s\r\n\r\n", ip, ip);
-        bool ret = connState->handleReadData(reqStr, strlen(reqStr));
+        connState->clientConnection->local.toUrl(ip, sizeof(ip));
+        SBuf reqStr;
+        reqStr.append("CONNECT ").append(ip).append(" HTTP/1.1\r\nHost: ").append(ip).append("\r\n\r\n");
+        bool ret = connState->handleReadData(&reqStr);
         if (ret)
             ret = connState->clientParseRequests();
 
@@ -3848,7 +3877,14 @@ httpsSslBumpAccessCheckDone(allow_t answer, void *data)
 static void
 httpsAccept(const CommAcceptCbParams &params)
 {
-    AnyP::PortCfg *s = static_cast<AnyP::PortCfg *>(params.data);
+    MasterXaction::Pointer xact = params.xaction;
+    const AnyP::PortCfgPointer s = xact->squidPort;
+
+    if (!s.valid()) {
+        // it is possible the call or accept() was still queued when the port was reconfigured
+        debugs(33, 2, "HTTPS accept failure: port reconfigured.");
+        return;
+    }
 
     if (params.flag != COMM_OK) {
         // Its possible the call was still queued when the client disconnected
@@ -3866,7 +3902,7 @@ httpsAccept(const CommAcceptCbParams &params)
     ++incoming_sockets_accepted;
 
     // Socket is ready, setup the connection manager to start using it
-    ConnStateData *connState = connStateCreate(params.conn, s);
+    ConnStateData *connState = new ConnStateData(xact);
 
     if (s->flags.tunnelSslBumping) {
         debugs(33, 5, "httpsAccept: accept transparent connection: " << params.conn);
@@ -3881,8 +3917,8 @@ httpsAccept(const CommAcceptCbParams &params)
         HttpRequest *request = new HttpRequest();
         static char ip[MAX_IPSTRLEN];
         assert(params.conn->flags & (COMM_TRANSPARENT | COMM_INTERCEPTION));
-        request->SetHost(params.conn->local.NtoA(ip, sizeof(ip)));
-        request->port = params.conn->local.GetPort();
+        request->SetHost(params.conn->local.toStr(ip, sizeof(ip)));
+        request->port = params.conn->local.port();
         request->myportname = s->name;
 
         ACLFilledChecklist *acl_checklist = new ACLFilledChecklist(Config.accessList.ssl_bump, request, NULL);
@@ -3900,7 +3936,14 @@ httpsAccept(const CommAcceptCbParams &params)
 static void
 ftpAccept(const CommAcceptCbParams &params)
 {
-    AnyP::PortCfg *s = static_cast<AnyP::PortCfg *>(params.data);
+    MasterXaction::Pointer xact = params.xaction;
+    AnyP::PortCfgPointer s = xact->squidPort;
+
+    if (!s.valid()) {
+        // it is possible the call or accept() was still queued when the port was reconfigured
+        debugs(33, 2, "FTP accept failure: port reconfigured.");
+        return;
+       }
 
     if (params.flag != COMM_OK) {
         // Its possible the call was still queued when the client disconnected
@@ -3918,18 +3961,19 @@ ftpAccept(const CommAcceptCbParams &params)
     ++incoming_sockets_accepted;
 
     // Socket is ready, setup the connection manager to start using it
-    ConnStateData *connState = connStateCreate(params.conn, s);
+    ConnStateData *connState = new ConnStateData(xact);
 
     if (connState->transparent()) {
         char buf[MAX_IPSTRLEN];
-        connState->clientConnection->local.ToURL(buf,MAX_IPSTRLEN);
-        connState->ftp.uri = "ftp://";
-        connState->ftp.uri.append(buf);
-        connState->ftp.uri.append("/");
-        debugs(33, 5, HERE << "FTP transparent URL: " << connState->ftp.uri);
+        connState->clientConnection->local.toUrl(buf, MAX_IPSTRLEN);
+        connState->ftp.host = buf;
+        const char *uri = connState->ftpBuildUri();
+        debugs(33, 5, HERE << "FTP transparent URL: " << uri);
     }
 
-    FtpWriteGreeting(connState);
+    FtpWriteEarlyReply(connState, 220, "Service ready");
+
+    // TODO: Merge common httpAccept() parts, applying USE_DELAY_POOLS to FTP.
 }
 
 void
@@ -3966,7 +4010,7 @@ ConnStateData::sslCrtdHandleReply(const HelperReply &reply)
 
 void ConnStateData::buildSslCertGenerationParams(Ssl::CertificateProperties &certProperties)
 {
-    certProperties.commonName =  sslCommonName.defined() ? sslCommonName.termedBuf() : sslConnectHostOrIp.termedBuf();
+    certProperties.commonName =  sslCommonName.size() > 0 ? sslCommonName.termedBuf() : sslConnectHostOrIp.termedBuf();
 
     // fake certificate adaptation requires bump-server-first mode
     if (!sslServerBump) {
@@ -4061,12 +4105,12 @@ ConnStateData::getSslContextStart()
         Ssl::CertificateProperties certProperties;
         buildSslCertGenerationParams(certProperties);
         sslBumpCertKey = certProperties.dbKey().c_str();
-        assert(sslBumpCertKey.defined() && sslBumpCertKey[0] != '\0');
+        assert(sslBumpCertKey.size() > 0 && sslBumpCertKey[0] != '\0');
 
         debugs(33, 5, HERE << "Finding SSL certificate for " << sslBumpCertKey << " in cache");
-        Ssl::LocalContextStorage & ssl_ctx_cache(Ssl::TheGlobalContextStorage.getLocalStorage(port->s));
+        Ssl::LocalContextStorage *ssl_ctx_cache = Ssl::TheGlobalContextStorage.getLocalStorage(port->s);
         SSL_CTX * dynCtx = NULL;
-        Ssl::SSL_CTX_Pointer *cachedCtx = ssl_ctx_cache.get(sslBumpCertKey.termedBuf());
+        Ssl::SSL_CTX_Pointer *cachedCtx = ssl_ctx_cache ? ssl_ctx_cache->get(sslBumpCertKey.termedBuf()) : NULL;
         if (cachedCtx && (dynCtx = cachedCtx->get())) {
             debugs(33, 5, HERE << "SSL certificate for " << sslBumpCertKey << " have found in cache");
             if (Ssl::verifySslCertificate(dynCtx, certProperties)) {
@@ -4075,7 +4119,8 @@ ConnStateData::getSslContextStart()
                 return;
             } else {
                 debugs(33, 5, HERE << "Cached SSL certificate for " << sslBumpCertKey << " is out of date. Delete this certificate from cache");
-                ssl_ctx_cache.del(sslBumpCertKey.termedBuf());
+                if (ssl_ctx_cache)
+                    ssl_ctx_cache->del(sslBumpCertKey.termedBuf());
             }
         } else {
             debugs(33, 5, HERE << "SSL certificate for " << sslBumpCertKey << " haven't found in cache");
@@ -4113,14 +4158,24 @@ ConnStateData::getSslContextDone(SSL_CTX * sslContext, bool isNew)
     // Try to add generated ssl context to storage.
     if (port->generateHostCertificates && isNew) {
 
-        if (signAlgorithm == Ssl::algSignTrusted)
+        if (signAlgorithm == Ssl::algSignTrusted) {
+            // Add signing certificate to the certificates chain
+            X509 *cert = port->signingCert.get();
+            if (SSL_CTX_add_extra_chain_cert(sslContext, cert)) {
+                // increase the certificate lock
+                CRYPTO_add(&(cert->references),1,CRYPTO_LOCK_X509);
+            } else {
+                const int ssl_error = ERR_get_error();
+                debugs(33, DBG_IMPORTANT, "WARNING: can not add signing certificate to SSL context chain: " << ERR_error_string(ssl_error, NULL));
+            }
             Ssl::addChainToSslContext(sslContext, port->certsToChain.get());
+        }
         //else it is self-signed or untrusted do not attrach any certificate
 
-        Ssl::LocalContextStorage & ssl_ctx_cache(Ssl::TheGlobalContextStorage.getLocalStorage(port->s));
-        assert(sslBumpCertKey.defined() && sslBumpCertKey[0] != '\0');
+        Ssl::LocalContextStorage *ssl_ctx_cache = Ssl::TheGlobalContextStorage.getLocalStorage(port->s);
+        assert(sslBumpCertKey.size() > 0 && sslBumpCertKey[0] != '\0');
         if (sslContext) {
-            if (!ssl_ctx_cache.add(sslBumpCertKey.termedBuf(), new Ssl::SSL_CTX_Pointer(sslContext))) {
+            if (!ssl_ctx_cache || !ssl_ctx_cache->add(sslBumpCertKey.termedBuf(), new Ssl::SSL_CTX_Pointer(sslContext))) {
                 // If it is not in storage delete after using. Else storage deleted it.
                 fd_table[clientConnection->fd].dynamicSslContext = sslContext;
             }
@@ -4206,7 +4261,7 @@ ConnStateData::httpsPeeked(Comm::ConnectionPointer serverConnection)
         // Squid serves its own error page and closes, so we want
         // a CN that causes no additional browser errors. Possible
         // only when bumping CONNECT with a user-typed address.
-        if (intendedDest.IsAnyAddr() || isConnectRequest)
+        if (intendedDest.isAnyAddr() || isConnectRequest)
             sslCommonName = sslConnectHostOrIp;
         else if (sslServerBump->serverCert.get())
             sslCommonName = Ssl::CommonHostName(sslServerBump->serverCert.get());
@@ -4219,7 +4274,7 @@ ConnStateData::httpsPeeked(Comm::ConnectionPointer serverConnection)
     getSslContextStart();
 }
 
-#endif /* USE_SSL */
+#endif /* USE_OPENSSL */
 
 /// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed
 static bool
@@ -4262,9 +4317,9 @@ clientHttpConnectionsOpen(void)
             continue;
         }
 
-#if USE_SSL
+#if USE_OPENSSL
         if (s->flags.tunnelSslBumping && !Config.accessList.ssl_bump) {
-            debugs(33, DBG_IMPORTANT, "WARNING: No ssl_bump configured. Disabling ssl-bump on " << s->protocol << "_port " << s->s);
+            debugs(33, DBG_IMPORTANT, "WARNING: No ssl_bump configured. Disabling ssl-bump on " << AnyP::UriScheme(s->transport.protocol) << "_port " << s->s);
             s->flags.tunnelSslBumping = false;
         }
 
@@ -4300,7 +4355,7 @@ clientHttpConnectionsOpen(void)
     }
 }
 
-#if USE_SSL
+#if USE_OPENSSL
 static void
 clientHttpsConnectionsOpen(void)
 {
@@ -4321,7 +4376,7 @@ clientHttpsConnectionsOpen(void)
 
         // TODO: merge with similar code in clientHttpConnectionsOpen()
         if (s->flags.tunnelSslBumping && !Config.accessList.ssl_bump) {
-            debugs(33, DBG_IMPORTANT, "WARNING: No ssl_bump configured. Disabling ssl-bump on " << s->protocol << "_port " << s->s);
+            debugs(33, DBG_IMPORTANT, "WARNING: No ssl_bump configured. Disabling ssl-bump on " << AnyP::UriScheme(s->transport.protocol) << "_port " << s->s);
             s->flags.tunnelSslBumping = false;
         }
 
@@ -4403,7 +4458,7 @@ clientListenerConnectionOpened(AnyP::PortCfg *s, const Ipc::FdNoteId portTypeNot
 
     debugs(1, DBG_IMPORTANT, "Accepting " <<
            (s->flags.natIntercept ? "NAT intercepted " : "") <<
-           (s->flags.tproxyIntercept ? "TPROXY spoofing " : "") <<
+           (s->flags.tproxyIntercept ? "TPROXY intercepted " : "") <<
            (s->flags.tunnelSslBumping ? "SSL bumped " : "") <<
            (s->flags.accelSurrogate ? "reverse-proxy " : "")
            << FdNote(portTypeNote) << " connections at "
@@ -4416,7 +4471,7 @@ void
 clientOpenListenSockets(void)
 {
     clientHttpConnectionsOpen();
-#if USE_SSL
+#if USE_OPENSSL
     clientHttpsConnectionsOpen();
 #endif
     clientFtpConnectionsOpen();
@@ -4436,7 +4491,7 @@ clientConnectionsClose(void)
         }
     }
 
-#if USE_SSL
+#if USE_OPENSSL
     for (AnyP::PortCfg *s = Config.Sockaddr.https; s; s = s->next) {
         if (s->listenConn != NULL) {
             debugs(1, DBG_IMPORTANT, "Closing HTTPS port " << s->listenConn->local);
@@ -4477,7 +4532,7 @@ varyEvaluateMatch(StoreEntry * entry, HttpRequest * request)
         if (vary) {
             /* Oops... something odd is going on here.. */
             debugs(33, DBG_IMPORTANT, "varyEvaluateMatch: Oops. Not a Vary object on second attempt, '" <<
-                   entry->mem_obj->url << "' '" << vary << "'");
+                   entry->mem_obj->urlXXX() << "' '" << vary << "'");
             safe_free(request->vary_headers);
             return VARY_CANCEL;
         }
@@ -4519,7 +4574,7 @@ varyEvaluateMatch(StoreEntry * entry, HttpRequest * request)
              * found the requested variant. Bail out
              */
             debugs(33, DBG_IMPORTANT, "varyEvaluateMatch: Oops. Not a Vary match on second attempt, '" <<
-                   entry->mem_obj->url << "' '" << vary << "'");
+                   entry->mem_obj->urlXXX() << "' '" << vary << "'");
             return VARY_CANCEL;
         }
     }
@@ -4531,7 +4586,7 @@ clientAclChecklistCreate(const acl_access * acl, ClientHttpRequest * http)
     ConnStateData * conn = http->getConn();
     ACLFilledChecklist *ch = new ACLFilledChecklist(acl, http->request,
             cbdataReferenceValid(conn) && conn != NULL && conn->clientConnection != NULL ? conn->clientConnection->rfc931 : dash_str);
-
+    ch->al = http->al;
     /*
      * hack for ident ACL. It needs to get full addresses, and a place to store
      * the ident result on persistent connections...
@@ -4542,25 +4597,6 @@ clientAclChecklistCreate(const acl_access * acl, ClientHttpRequest * http)
 
 CBDATA_CLASS_INIT(ConnStateData);
 
-ConnStateData::ConnStateData(const char *protocol) :
-        AsyncJob("ConnStateData"),
-        isFtp(strcmp(protocol, "ftp") == 0),
-#if USE_SSL
-        sslBumpMode(Ssl::bumpEnd),
-        switchedToHttps_(false),
-        sslServerBump(NULL),
-#endif
-        stoppedSending_(NULL),
-        stoppedReceiving_(NULL)
-{
-    pinning.host = NULL;
-    pinning.port = -1;
-    pinning.pinned = false;
-    pinning.auth = false;
-    pinning.zeroReply = false;
-    pinning.peer = NULL;
-}
-
 bool
 ConnStateData::transparent() const
 {
@@ -4603,7 +4639,7 @@ ConnStateData::mayNeedToReadMoreBody() const
         return -1; // probably need to read more, but we cannot be sure
 
     const int64_t needToProduce = bodyPipe->unproducedSize();
-    const int64_t haveAvailable = static_cast<int64_t>(in.notYetUsed);
+    const int64_t haveAvailable = static_cast<int64_t>(in.buf.length());
 
     if (needToProduce <= haveAvailable)
         return 0; // we have read what we need (but are waiting for pipe space)
@@ -4673,20 +4709,13 @@ ConnStateData::finishDechunkingRequest(bool withSuccess)
     in.bodyParser = NULL;
 }
 
-char *
-ConnStateData::In::addressToReadInto() const
-{
-    return buf + notYetUsed;
-}
-
-ConnStateData::In::In() : bodyParser(NULL),
-        buf (NULL), notYetUsed (0), allocatedSize (0)
+ConnStateData::In::In() :
+        bodyParser(NULL),
+        buf()
 {}
 
 ConnStateData::In::~In()
 {
-    if (allocatedSize)
-        memFreeBuf(allocatedSize, buf);
     delete bodyParser; // TODO: pool
 }
 
@@ -4717,37 +4746,47 @@ ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io)
     assert(pinning.serverConnection == io.conn);
     pinning.closeHandler = NULL; // Comm unregisters handlers before calling
     const bool sawZeroReply = pinning.zeroReply; // reset when unpinning
-    unpinConnection();
-    if (sawZeroReply) {
-        debugs(33, 3, "Closing client connection on pinned zero reply.");
-        clientConnection->close();
-    }
+    unpinConnection(false);
+
     if (isFtp) {
-        // XXX
-        /*
-        debugs(33, 5, HERE << "FTP server connection closed, closing client "
-               "connection.");
+        // if the server control connection is gone, reset state to login again
+        // TODO: merge with similar code in FtpHandleUserRequest()
+        debugs(33, 5, "will need to re-login due to FTP server closure");
+        ftp.readGreeting = false;
+        FtpChangeState(this, ConnStateData::FTP_BEGIN, "server closure");
+        // XXX: Not enough. Gateway::ServerStateData::sendCommand() will not
+        // re-login because clientState() is not ConnStateData::FTP_CONNECTED.
+    }
+
+    if (sawZeroReply && clientConnection != NULL) {
+        debugs(33, 3, "Closing client connection on pinned zero reply.");
         clientConnection->close();
-        */
     }
+
 }
 
 void
-ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth)
+ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth, bool monitor)
 {
-    char desc[FD_DESC_SZ];
+    if (!Comm::IsConnOpen(pinning.serverConnection) || 
+        pinning.serverConnection->fd != pinServer->fd)
+        pinNewConnection(pinServer, request, aPeer, auth);
 
-    if (Comm::IsConnOpen(pinning.serverConnection)) {
-        if (pinning.serverConnection->fd == pinServer->fd)
-            return;
-    }
+    if (monitor)
+        startPinnedConnectionMonitoring();
+}
 
-    unpinConnection(); // closes pinned connection, if any, and resets fields
+void
+ConnStateData::pinNewConnection(const Comm::ConnectionPointer &pinServer, HttpRequest *request, CachePeer *aPeer, bool auth)
+{
+    unpinConnection(true); // closes pinned connection, if any, and resets fields
 
     pinning.serverConnection = pinServer;
 
     debugs(33, 3, HERE << pinning.serverConnection);
 
+    Must(pinning.serverConnection != NULL);
+
     // when pinning an SSL bumped connection, the request may be NULL
     const char *pinnedHost = "[unknown]";
     if (request) {
@@ -4755,16 +4794,17 @@ ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpReque
         pinning.port = request->port;
         pinnedHost = pinning.host;
     } else {
-        pinning.port = pinServer->remote.GetPort();
+        pinning.port = pinServer->remote.port();
     }
     pinning.pinned = true;
     if (aPeer)
         pinning.peer = cbdataReference(aPeer);
     pinning.auth = auth;
     char stmp[MAX_IPSTRLEN];
+    char desc[FD_DESC_SZ];
     snprintf(desc, FD_DESC_SZ, "%s pinned connection for %s (%d)",
              (auth || !aPeer) ? pinnedHost : aPeer->name,
-             clientConnection->remote.ToURL(stmp,MAX_IPSTRLEN),
+             clientConnection->remote.toUrl(stmp,MAX_IPSTRLEN),
              clientConnection->fd);
     fd_note(pinning.serverConnection->fd, desc);
 
@@ -4778,6 +4818,56 @@ ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServer, HttpReque
     comm_add_close_handler(pinning.serverConnection->fd, pinning.closeHandler);
 }
 
+/// [re]start monitoring pinned connection for server closures so that we can
+/// propagate them to an _idle_ client pinned to the server
+void
+ConnStateData::startPinnedConnectionMonitoring()
+{
+    if (pinning.readHandler != NULL)
+        return; // already monitoring
+
+    typedef CommCbMemFunT<ConnStateData, CommIoCbParams> Dialer;
+    pinning.readHandler = JobCallback(33, 3,
+                                      Dialer, this, ConnStateData::clientPinnedConnectionRead);
+    static char unusedBuf[8];
+    comm_read(pinning.serverConnection, unusedBuf, sizeof(unusedBuf), pinning.readHandler);
+}
+
+void
+ConnStateData::stopPinnedConnectionMonitoring()
+{
+    if (pinning.readHandler != NULL) {
+        comm_read_cancel(pinning.serverConnection->fd, pinning.readHandler);
+        pinning.readHandler = NULL;
+    }
+}
+
+/// Our read handler called by Comm when the server either closes an idle pinned connection or
+/// perhaps unexpectedly sends something on that idle (from Squid p.o.v.) connection.
+void
+ConnStateData::clientPinnedConnectionRead(const CommIoCbParams &io)
+{
+    pinning.readHandler = NULL; // Comm unregisters handlers before calling
+
+    if (io.flag == COMM_ERR_CLOSING)
+        return; // close handler will clean up
+
+    // We could use getConcurrentRequestCount(), but this may be faster.
+    const bool clientIsIdle = !getCurrentContext();
+
+    debugs(33, 3, "idle pinned " << pinning.serverConnection << " read " <<
+           io.size << (clientIsIdle ? " with idle client" : ""));
+
+    assert(pinning.serverConnection == io.conn);
+    pinning.serverConnection->close();
+
+    // If we are still sending data to the client, do not close now. When we are done sending,
+    // ClientSocketContext::keepaliveNextRequest() checks pinning.serverConnection and will close.
+    // However, if we are idle, then we must close to inform the idle client and minimize races.
+    if (clientIsIdle && clientConnection != NULL)
+        clientConnection->close();
+}
+
 const Comm::ConnectionPointer
 ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *aPeer)
 {
@@ -4786,29 +4876,35 @@ ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *a
     bool valid = true;
     if (!Comm::IsConnOpen(pinning.serverConnection))
         valid = false;
-    if (pinning.auth && request && strcasecmp(pinning.host, request->GetHost()) != 0) {
+    else if (pinning.auth && pinning.host && request && strcasecmp(pinning.host, request->GetHost()) != 0)
         valid = false;
-    }
-    if (request && pinning.port != request->port) {
+    else if (request && pinning.port != request->port)
         valid = false;
-    }
-    if (pinning.peer && !cbdataReferenceValid(pinning.peer)) {
+    else if (pinning.peer && !cbdataReferenceValid(pinning.peer))
         valid = false;
-    }
-    if (aPeer != pinning.peer) {
+    else if (aPeer != pinning.peer)
         valid = false;
-    }
 
     if (!valid) {
         /* The pinning info is not safe, remove any pinning info */
-        unpinConnection();
+        unpinConnection(true);
     }
 
     return pinning.serverConnection;
 }
 
+Comm::ConnectionPointer
+ConnStateData::borrowPinnedConnection(HttpRequest *request, const CachePeer *aPeer)
+{
+    debugs(33, 7, pinning.serverConnection);
+    if (validatePinnedConnection(request, aPeer) != NULL)
+        stopPinnedConnectionMonitoring();
+
+    return pinning.serverConnection; // closed if validation failed
+}
+
 void
-ConnStateData::unpinConnection()
+ConnStateData::unpinConnection(const bool andClose)
 {
     debugs(33, 3, HERE << pinning.serverConnection);
 
@@ -4820,9 +4916,13 @@ ConnStateData::unpinConnection()
             comm_remove_close_handler(pinning.serverConnection->fd, pinning.closeHandler);
             pinning.closeHandler = NULL;
         }
-        /// also close the server side socket, we should not use it for any future requests...
-        // TODO: do not close if called from our close handler?
-        pinning.serverConnection->close();
+
+        stopPinnedConnectionMonitoring();
+
+        // close the server side socket if requested
+        if (andClose)
+            pinning.serverConnection->close();
+        pinning.serverConnection = NULL;
     }
 
     safe_free(pinning.host);
@@ -4833,7 +4933,36 @@ ConnStateData::unpinConnection()
      * connection has gone away */
 }
 
-
+const char *
+ConnStateData::ftpBuildUri(const char *file)
+{
+    ftp.uri = "ftp://";
+    ftp.uri.append(ftp.host);
+    if (port->ftp_track_dirs && ftp.workingDir.size()) {
+        if (ftp.workingDir[0] != '/')
+            ftp.uri.append("/");
+        ftp.uri.append(ftp.workingDir);
+    }
+
+    if (ftp.uri[ftp.uri.size() - 1] != '/')
+        ftp.uri.append("/");
+
+    if (port->ftp_track_dirs && file) {
+        //remove any '/' from the beginning of path
+        while (*file == '/')
+            ++file;
+        ftp.uri.append(file);
+    }
+
+    return ftp.uri.termedBuf();
+}
+
+void
+ConnStateData::ftpSetWorkingDir(const char *dir)
+{
+    ftp.workingDir = dir;
+}
+
 static void
 FtpAcceptDataConnection(const CommAcceptCbParams &params)
 {
@@ -4846,18 +4975,49 @@ FtpAcceptDataConnection(const CommAcceptCbParams &params)
         return;
     }
 
-    debugs(33, 4, HERE << params.conn << ": accepted");
-    fd_note(params.conn->fd, "client ftp data connect");
+    debugs(33, 4, "accepted " << params.conn);
+    fd_note(params.conn->fd, "passive client ftp data");
     ++incoming_sockets_accepted;
 
-    FtpCloseDataConnection(connState);
-    connState->ftp.dataConn = params.conn;
-    connState->ftp.uploadAvailSize = 0;
+    if (!connState->clientConnection) {
+        debugs(33, 5, "late data connection?");
+        FtpCloseDataConnection(connState); // in case we are still listening
+        params.conn->close();
+    } else
+    if (params.conn->remote != connState->clientConnection->remote) {
+        debugs(33, 2, "rogue data conn? ctrl: " << connState->clientConnection->remote);
+        params.conn->close();
+        // Some FTP servers close control connection here, but it may make
+        // things worse from DoS p.o.v. and no better from data stealing p.o.v.
+    } else {
+        FtpCloseDataConnection(connState);
+        connState->ftp.dataConn = params.conn;
+        connState->ftp.uploadAvailSize = 0;
+        debugs(33, 7, "ready for data");
+        if (connState->ftp.onDataAcceptCall != NULL) {
+            AsyncCall::Pointer call = connState->ftp.onDataAcceptCall;
+            connState->ftp.onDataAcceptCall = NULL;
+            // If we got an upload request, start reading data from the client.
+            if (connState->ftp.state == ConnStateData::FTP_HANDLE_UPLOAD_REQUEST)
+                connState->readSomeFtpData();
+            else
+                Must(connState->ftp.state == ConnStateData::FTP_HANDLE_DATA_REQUEST);
+            MemBuf mb;
+            mb.init();
+            mb.Printf("150 Data connection opened.\r\n");
+            Comm::Write(connState->clientConnection, &mb, call);
+        }
+    }
 }
 
 static void
 FtpCloseDataConnection(ConnStateData *conn)
 {
+    if (conn->ftp.listener != NULL) {
+        conn->ftp.listener->cancel("no longer needed");
+        conn->ftp.listener = NULL;
+    }
+
     if (Comm::IsConnOpen(conn->ftp.dataListenConn)) {
         debugs(33, 5, HERE << "FTP closing client data listen socket: " <<
                *conn->ftp.dataListenConn);
@@ -4865,28 +5025,22 @@ FtpCloseDataConnection(ConnStateData *conn)
     }
     conn->ftp.dataListenConn = NULL;
 
+    if (conn->ftp.reader != NULL) {
+        // comm_read_cancel can deal with negative FDs
+        comm_read_cancel(conn->ftp.dataConn->fd, conn->ftp.reader);
+        conn->ftp.reader = NULL;
+    }
+
     if (Comm::IsConnOpen(conn->ftp.dataConn)) {
         debugs(33, 5, HERE << "FTP closing client data connection: " <<
                *conn->ftp.dataConn);
         conn->ftp.dataConn->close();
     }
     conn->ftp.dataConn = NULL;
-    conn->ftp.reader = NULL;
-}
-
-static void
-FtpWriteGreeting(ConnStateData *conn)
-{
-    MemBuf mb;
-    const String msg = "220 Service ready\r\n";
-    mb.init(msg.size(), msg.size());
-    mb.append(msg.rawBuf(), msg.size());
-
-    AsyncCall::Pointer call = commCbCall(33, 5, "FtpWroteEarlyReply",
-        CommIoCbPtrFun(&FtpWroteEarlyReply, conn));
-    Comm::Write(conn->clientConnection, &mb, call);
 }
 
+/// Writes FTP [error] response before we fully parsed the FTP request and
+/// created the corresponding HTTP request wrapper for that FTP request.
 static void
 FtpWriteEarlyReply(ConnStateData *connState, const int code, const char *msg)
 {
@@ -4902,11 +5056,17 @@ FtpWriteEarlyReply(ConnStateData *connState, const int code, const char *msg)
     Comm::Write(connState->clientConnection, &mb, call);
 
     connState->flags.readMore = false;
+
+    // TODO: Create master transaction. Log it in FtpWroteEarlyReply.
 }
 
 static void
 FtpWriteReply(ClientSocketContext *context, MemBuf &mb)
 {
+    debugs(11, 2, "FTP Client " << context->clientConnection);
+    debugs(11, 2, "FTP Client REPLY:\n---------\n" << mb.buf <<
+           "\n----------");
+
     AsyncCall::Pointer call = commCbCall(33, 5, "FtpWroteReply",
         CommIoCbPtrFun(&FtpWroteReply, context));
     Comm::Write(context->clientConnection, &mb, call);
@@ -4934,6 +5094,21 @@ FtpWriteCustomReply(ClientSocketContext *context, const int code, const char *ms
     FtpWriteReply(context, mb);
 }
 
+static void 
+FtpChangeState(ConnStateData *connState, const ConnStateData::FtpState newState, const char *reason)
+{
+    assert(connState);
+    if (connState->ftp.state == newState) {
+        debugs(33, 3, "client state unchanged at " << connState->ftp.state <<
+               " because " << reason);
+        connState->ftp.state = newState;
+    } else {
+        debugs(33, 3, "client state was " << connState->ftp.state <<
+               ", now " << newState << " because " << reason);
+        connState->ftp.state = newState;
+    }
+}
+
 /** Parse an FTP request
  *
  *  \note Sets result->flags.parsed_ok to 0 if failed to parse the request,
@@ -4951,13 +5126,15 @@ FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::Pro
 {
     *http_ver = Http::ProtocolVersion(1, 1);
 
+    // TODO: Use tokenizer for parsing instead of raw pointer manipulation.
+    const char *inBuf = connState->in.buf.rawContent();
+
     const char *const eor =
-        static_cast<const char *>(memchr(connState->in.buf, '\n',
-            min(connState->in.notYetUsed, Config.maxRequestHeaderSize)));
-    const size_t req_sz = eor + 1 - connState->in.buf;
+        static_cast<const char *>(memchr(inBuf, '\n',
+            min(static_cast<size_t>(connState->in.buf.length()), Config.maxRequestHeaderSize)));
 
-    if (eor == NULL && connState->in.notYetUsed >= Config.maxRequestHeaderSize) {
-        connState->ftp.state = ConnStateData::FTP_ERROR;
+    if (eor == NULL && connState->in.buf.length() >= Config.maxRequestHeaderSize) {
+        FtpChangeState(connState, ConnStateData::FTP_ERROR, "huge req");
         FtpWriteEarlyReply(connState, 421, "Too large request");
         return NULL;
     }
@@ -4967,27 +5144,28 @@ FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::Pro
         return NULL;
     }
 
-    connNoteUseOfBuffer(connState, req_sz);
+    const size_t req_sz = eor + 1 - inBuf;
 
     // skip leading whitespaces
-    const char *boc = connState->in.buf;
+    const char *boc = inBuf; // beginning of command
     while (boc < eor && isspace(*boc)) ++boc;
     if (boc >= eor) {
         debugs(33, 5, HERE << "Empty request, ignoring");
+        connNoteUseOfBuffer(connState, req_sz);
         return NULL;
     }
 
-    const char *eoc = boc;
+    const char *eoc = boc; // end of command
     while (eoc < eor && !isspace(*eoc)) ++eoc;
-    connState->in.buf[eoc - connState->in.buf] = '\0';
+    connState->in.buf.setAt(eoc - inBuf, '\0');
 
-    const char *bop = eoc + 1;
+    const char *bop = eoc + 1; // beginning of parameter
     while (bop < eor && isspace(*bop)) ++bop;
     if (bop < eor) {
         const char *eop = eor - 1;
         while (isspace(*eop)) --eop;
         assert(eop >= bop);
-        connState->in.buf[eop + 1 - connState->in.buf] = '\0';
+        connState->in.buf.setAt(eop + 1 - inBuf, '\0');
     } else
         bop = NULL;
 
@@ -4995,32 +5173,37 @@ FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::Pro
            (bop == NULL ? "no " : "") << "parameters" <<
            (bop != NULL ? ": " : "") << bop);
 
+    // TODO: Use SBuf instead of String
     const String cmd = boc;
     String params = bop;
 
-    *method_p = !cmd.caseCmp("APPE") || !cmd.caseCmp("STOR") ||
-        !cmd.caseCmp("STOU") ? Http::METHOD_PUT : Http::METHOD_GET;
+    connNoteUseOfBuffer(connState, req_sz);
 
-    if (connState->ftp.uri.size() == 0) {
+    if (!connState->ftp.readGreeting) {
         // the first command must be USER
-        if (cmd.caseCmp("USER") != 0) {
+        if (!connState->pinning.pinned && cmd.caseCmp("USER") != 0) {
             FtpWriteEarlyReply(connState, 530, "Must login first");
             return NULL;
         }
-
-        if (params.size() == 0) {
-            FtpWriteEarlyReply(connState, 501, "Missing username");
-            return NULL;
-        }
     }
 
-    // We need to process USER request now because it sets request URI.
+    // We need to process USER request now because it sets ftp server Hostname.
     if (cmd.caseCmp("USER") == 0 &&
         !FtpHandleUserRequest(connState, cmd, params))
         return NULL;
 
-    assert(connState->ftp.uri.size() > 0);
-    char *uri = xstrdup(connState->ftp.uri.termedBuf());
+    if (!FtpSupportedCommand(cmd)) {
+        FtpWriteEarlyReply(connState, 502, "Unknown or unsupported command");
+        return NULL;
+    }
+
+    *method_p = !cmd.caseCmp("APPE") || !cmd.caseCmp("STOR") ||
+        !cmd.caseCmp("STOU") ? Http::METHOD_PUT : Http::METHOD_GET;
+
+    char *uri;
+    const char *aPath = params.size() > 0 && Ftp::hasPathParameter(cmd)?
+        params.termedBuf() : NULL;
+    uri = xstrdup(connState->ftpBuildUri(aPath));
     HttpRequest *const request =
         HttpRequest::CreateFromUrlAndMethod(uri, *method_p);
     if (request == NULL) {
@@ -5032,11 +5215,18 @@ FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::Pro
     }
 
     request->http_ver = *http_ver;
+
+    // Our fake Request-URIs are not distinctive enough for caching to work
+    request->flags.cachable = false; // XXX: reset later by maybeCacheable()
+    request->flags.noCache = true;
+
     request->header.putStr(HDR_FTP_COMMAND, cmd.termedBuf());
     request->header.putStr(HDR_FTP_ARGUMENTS, params.termedBuf() != NULL ?
                            params.termedBuf() : "");
-    if (*method_p == Http::METHOD_PUT)
+    if (*method_p == Http::METHOD_PUT) {
+        request->header.putStr(HDR_EXPECT, "100-continue");
         request->header.putStr(HDR_TRANSFER_ENCODING, "chunked");
+    }
 
     ClientHttpRequest *const http = new ClientHttpRequest(connState);
     http->request = request;
@@ -5045,7 +5235,7 @@ FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::Pro
     http->uri = uri;
 
     ClientSocketContext *const result =
-        ClientSocketContextNew(connState->clientConnection, http);
+        new ClientSocketContext(connState->clientConnection, http);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = result->reqbuf;
@@ -5066,20 +5256,25 @@ FtpParseRequest(ConnStateData *connState, HttpRequestMethod *method_p, Http::Pro
 static void
 FtpHandleReply(ClientSocketContext *context, HttpReply *reply, StoreIOBuffer data)
 {
-    if (reply != NULL) {
-        MemBuf *const mb = reply->pack();
-        debugs(11, 2, "FTP Client " << context->clientConnection);
-        debugs(11, 2, "FTP Client REPLY:\n---------\n" << mb->buf <<
-               "\n----------");
-        delete mb;
+    if (context->http && context->http->al != NULL &&
+        !context->http->al->reply && reply) {
+        context->http->al->reply = reply;
+        HTTPMSGLOCK(context->http->al->reply);
     }
 
     static FtpReplyHandler *handlers[] = {
         NULL, // FTP_BEGIN
         NULL, // FTP_CONNECTED
+        FtpHandleFeatReply, // FTP_HANDLE_FEAT
         FtpHandlePasvReply, // FTP_HANDLE_PASV
+        FtpHandlePortReply, // FTP_HANDLE_PORT
         FtpHandleDataReply, // FTP_HANDLE_DATA_REQUEST
-        FtpHandleUploadReply, // FTP_HANDLE_DATA_REQUEST
+        FtpHandleUploadReply, // FTP_HANDLE_UPLOAD_REQUEST
+        FtpHandleEprtReply,// FTP_HANDLE_EPRT
+        FtpHandleEpsvReply,// FTP_HANDLE_EPSV
+        NULL, // FTP_HANDLE_CWD
+        NULL, //FTP_HANDLE_PASS
+        NULL, // FTP_HANDLE_CDUP
         FtpHandleErrorReply // FTP_ERROR
     };
     const ConnStateData::FtpState state = context->getConn()->ftp.state;
@@ -5090,6 +5285,76 @@ FtpHandleReply(ClientSocketContext *context, HttpReply *reply, StoreIOBuffer dat
         FtpWriteForwardedReply(context, reply);
 }
 
+static void
+FtpHandleFeatReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support FEAT", reply);
+        return;
+    }
+
+    HttpReply *filteredReply = reply->clone();
+    HttpHeader &filteredHeader = filteredReply->header;
+
+    // Remove all unsupported commands from the response wrapper.
+    int deletedCount = 0;
+    HttpHeaderPos pos = HttpHeaderInitPos;
+    bool hasEPRT = false;
+    bool hasEPSV = false;
+    int prependSpaces = 1;
+    while (const HttpHeaderEntry *e = filteredHeader.getEntry(&pos)) {
+        if (e->id == HDR_FTP_PRE) {
+            // assume RFC 2389 FEAT response format, quoted by Squid:
+            // <"> SP NAME [SP PARAMS] <">
+            // but accommodate MS servers sending four SPs before NAME
+            if (e->value.size() < 4)
+                continue;
+            const char *raw = e->value.termedBuf();
+            if (raw[0] != '"' || raw[1] != ' ')
+                continue;
+            const char *beg = raw + 1 + strspn(raw + 1, " "); // after quote and spaces
+            // command name ends with (SP parameter) or quote
+            const char *end = beg + strcspn(beg, " \"");
+
+            if (end <= beg)
+                continue;
+
+            // compute the number of spaces before the command
+            prependSpaces = beg - raw - 1;
+
+            const String cmd = e->value.substr(beg-raw, end-raw);
+
+            if (!FtpSupportedCommand(cmd))
+                filteredHeader.delAt(pos, deletedCount);
+
+            if (cmd == "EPRT")
+                hasEPRT = true;
+            else if (cmd == "EPSV")
+                hasEPSV = true;
+        }
+    }
+
+    char buf[256];
+    int insertedCount = 0;
+    if (!hasEPRT) {
+        snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPRT");
+        filteredHeader.putStr(HDR_FTP_PRE, buf);
+        ++insertedCount;
+    }
+    if (!hasEPSV) {
+        snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPSV");
+        filteredHeader.putStr(HDR_FTP_PRE, buf);
+        ++insertedCount;
+    }
+
+    if (deletedCount || insertedCount) {
+        filteredHeader.refreshMask();
+        debugs(33, 5, "deleted " << deletedCount << " inserted " << insertedCount);
+    }
+
+    FtpWriteForwardedReply(context, filteredReply);
+}
+
 static void
 FtpHandlePasvReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
 {
@@ -5101,10 +5366,11 @@ FtpHandlePasvReply(ClientSocketContext *context, const HttpReply *reply, StoreIO
     FtpCloseDataConnection(context->getConn());
 
     Comm::ConnectionPointer conn = new Comm::Connection;
+    ConnStateData * const connState = context->getConn();
     conn->flags = COMM_NONBLOCKING;
-    conn->local = context->clientConnection->local;
-    conn->local.SetPort(0);
-    ConnStateData *const connState = context->getConn();
+    conn->local = connState->transparent() ?
+                  connState->port->s : context->clientConnection->local;
+    conn->local.port(0);
     const char *const note = connState->ftp.uri.termedBuf();
     comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn, note);
     if (!Comm::IsConnOpen(conn)) {
@@ -5118,63 +5384,94 @@ FtpHandlePasvReply(ClientSocketContext *context, const HttpReply *reply, StoreIO
     RefCount<AcceptCall> subCall = commCbCall(5, 5, "FtpAcceptDataConnection",
         CommAcceptCbPtrFun(FtpAcceptDataConnection, connState));
     Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
-    AsyncJob::Start(new Comm::TcpAcceptor(conn, note, sub));
-
+    connState->ftp.listener = subCall.getRaw();
     connState->ftp.dataListenConn = conn;
+    AsyncJob::Start(new Comm::TcpAcceptor(conn, note, sub));
 
     char addr[MAX_IPSTRLEN];
-    conn->local.NtoA(addr, MAX_IPSTRLEN, AF_INET);
+    // remote server in interception setups and local address otherwise
+    const Ip::Address &server = connState->transparent() ?
+                                context->clientConnection->local : conn->local;
+    server.toStr(addr, MAX_IPSTRLEN, AF_INET);
     addr[MAX_IPSTRLEN - 1] = '\0';
     for (char *c = addr; *c != '\0'; ++c) {
         if (*c == '.')
             *c = ',';
     }
 
+    // conn->fd is the client data connection (and its local port)
     const unsigned short port = comm_local_port(conn->fd);
-    conn->local.SetPort(port);
+    conn->local.port(port);
 
+    // In interception setups, we combine remote server address with a
+    // local port number and hope that traffic will be redirected to us.
     MemBuf mb;
     mb.init();
-    mb.Printf("227 =%s,%i,%i\r\n", addr, static_cast<int>(port >> 8),
+
+    // Do not use "227 =a,b,c,d,p1,p2" format or omit parens: some nf_ct_ftp
+    // versions block responses that use those alternative syntax rules!
+    mb.Printf("227 Entering Passive Mode (%s,%i,%i).\r\n",
+              addr,
+              static_cast<int>(port / 256),
               static_cast<int>(port % 256));
 
+    debugs(11, 3, Raw("writing", mb.buf, mb.size));
     FtpWriteReply(context, mb);
 }
 
+static void
+FtpHandlePortReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support PASV (converted from PORT)", reply);
+        return;
+    }
+
+    FtpWriteCustomReply(context, 200, "PORT successfully converted to PASV.");
+
+    // and wait for RETR
+}
+
 static void
 FtpHandleErrorReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
 {
-    int code;
     ConnStateData *const connState = context->getConn();
-    if (!connState->pinning.pinned) // we failed to connect to server
+    if (!connState->pinning.pinned) // we failed to connect to server
         connState->ftp.uri.clean();
-        code = 530;
-    } else
-        code = 421;
-    const char *const msg = err_type_str[context->http->request->errType];
-    FtpWriteCustomReply(context, code, msg, reply);
+    // 421: we will close due to FTP_ERROR
+    FtpWriteErrorReply(context, reply, 421);
 }
 
 static void
 FtpHandleDataReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
 {
+    ConnStateData *const conn = context->getConn();
+
     if (reply != NULL && reply->sline.status() != Http::scOkay) {
         FtpWriteForwardedReply(context, reply);
+        if (conn && Comm::IsConnOpen(conn->ftp.dataConn)) {
+            debugs(33, 3, "closing " << conn->ftp.dataConn << " on KO reply");
+            FtpCloseDataConnection(conn);
+        }
         return;
     }
 
-    debugs(33, 7, HERE << data.length);
-
-    ConnStateData *const conn = context->getConn();
+    if (!conn->ftp.dataConn) {
+        // We got STREAM_COMPLETE (or error) and closed the client data conn.
+        debugs(33, 3, "ignoring FTP srv data response after clt data closure");
+        return;
+    }
 
-    if (data.length <= 0) {
-        FtpWroteReplyData(conn->clientConnection, NULL, 0, COMM_OK, 0, context);
+    if (!FtpCheckDataConnPost(context)) {
+        FtpWriteCustomReply(context, 425, "Data connection is not established.");
+        FtpCloseDataConnection(conn);
         return;
     }
 
-    if (!Comm::IsConnOpen(conn->ftp.dataConn)) {
-        debugs(33, 3, HERE << "got FTP reply data when client data connection "
-               "is closed, ignoring");
+    debugs(33, 7, HERE << data.length);
+
+    if (data.length <= 0) {
+        FtpWroteReplyData(conn->clientConnection, NULL, 0, COMM_OK, 0, context);
         return;
     }
 
@@ -5206,8 +5503,12 @@ FtpWroteReplyData(const Comm::ConnectionPointer &conn, char *bufnotused, size_t
         return;
     }
 
+    assert(context->http);
+    context->http->out.size += size;
+
     switch (context->socketState()) {
     case STREAM_NONE:
+        debugs(33, 3, "Keep going");
         context->pullData();
         return;
     case STREAM_COMPLETE:
@@ -5233,6 +5534,7 @@ static void
 FtpHandleUploadReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
 {
     FtpWriteForwardedReply(context, reply);
+    // note that the client data connection may already be closed by now
 }
 
 static void
@@ -5243,6 +5545,121 @@ FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply)
     FtpWriteForwardedReply(context, reply, call);
 }
 
+static void
+FtpHandleEprtReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Server does not support PASV (converted from EPRT)", reply);
+        return;
+    }
+
+    FtpWriteCustomReply(context, 200, "EPRT successfully converted to PASV.");
+
+    // and wait for RETR
+}
+
+static void
+FtpHandleEpsvReply(ClientSocketContext *context, const HttpReply *reply, StoreIOBuffer data)
+{
+    if (context->http->request->errType != ERR_NONE) {
+        FtpWriteCustomReply(context, 502, "Cannot connect to server", reply);
+        return;
+    }
+
+    FtpCloseDataConnection(context->getConn());
+
+    Comm::ConnectionPointer conn = new Comm::Connection;
+    ConnStateData * const connState = context->getConn();
+    conn->flags = COMM_NONBLOCKING;
+    conn->local = connState->transparent() ?
+                  connState->port->s : context->clientConnection->local;
+    conn->local.port(0);
+    const char *const note = connState->ftp.uri.termedBuf();
+    comm_open_listener(SOCK_STREAM, IPPROTO_TCP, conn, note);
+    if (!Comm::IsConnOpen(conn)) {
+            debugs(5, DBG_CRITICAL, "comm_open_listener failed: " <<
+                   conn->local << " error: " << errno);
+            FtpWriteCustomReply(context, 451, "Internal error");
+            return;
+    }
+
+    typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+    RefCount<AcceptCall> subCall = commCbCall(5, 5, "FtpAcceptDataConnection",
+        CommAcceptCbPtrFun(FtpAcceptDataConnection, connState));
+    Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
+    connState->ftp.listener = subCall.getRaw();
+    connState->ftp.dataListenConn = conn;
+    AsyncJob::Start(new Comm::TcpAcceptor(conn, note, sub));
+
+    // conn->fd is the client data connection (and its local port)
+    const unsigned int port = comm_local_port(conn->fd);
+    conn->local.port(port);
+
+    // In interception setups, we combine remote server address with a
+    // local port number and hope that traffic will be redirected to us.
+    MemBuf mb;
+    mb.init();
+    mb.Printf("229 Entering Extended Passive Mode (|||%u|)\r\n", port);
+
+    debugs(11, 3, Raw("writing", mb.buf, mb.size));
+    FtpWriteReply(context, mb);
+}
+
+/// writes FTP error response with given status and reply-derived error details
+static void
+FtpWriteErrorReply(ClientSocketContext *context, const HttpReply *reply, const int status)
+{
+    MemBuf mb;
+    mb.init();
+
+    assert(context->http);
+    const HttpRequest *request = context->http->request;
+    assert(request);
+    if (request->errType != ERR_NONE)
+        mb.Printf("%i-%s\r\n", status, errorPageName(request->errType));
+
+    if (request->errDetail > 0) {
+        // XXX: > 0 may not always mean that this is an errno
+        mb.Printf("%i-Error: (%d) %s\r\n", status,
+                  request->errDetail,
+                  strerror(request->errDetail));
+    }
+
+    // XXX: Remove hard coded names. Use an error page template instead.
+    const Adaptation::History::Pointer ah = request->adaptHistory();
+    if (ah != NULL) { // XXX: add adapt::<all_h but use lastMeta here
+        const String info = ah->allMeta.getByName("X-Response-Info");
+        const String desc = ah->allMeta.getByName("X-Response-Desc");
+        if (info.size())
+            mb.Printf("%i-Information: %s\r\n", status, info.termedBuf());
+        if (desc.size())
+            mb.Printf("%i-Description: %s\r\n", status, desc.termedBuf());
+    }
+
+    assert(reply != NULL);
+    const char *reason = reply->header.has(HDR_FTP_REASON) ?
+                         reply->header.getStr(HDR_FTP_REASON):
+                         reply->sline.reason();
+
+    mb.Printf("%i %s\r\n", status, reason); // error terminating line
+
+    // TODO: errorpage.cc should detect FTP client and use
+    // configurable FTP-friendly error templates which we should
+    // write to the client "as is" instead of hiding most of the info
+
+    FtpWriteReply(context, mb);
+}
+
+/// writes FTP response based on HTTP reply that is not an FTP-response wrapper
+static void 
+FtpWriteForwardedForeign(ClientSocketContext *context, const HttpReply *reply)
+{
+    ConnStateData *const connState = context->getConn();
+    FtpChangeState(connState, ConnStateData::FTP_CONNECTED, "foreign reply");
+    // 451: We intend to keep the control connection open.
+    FtpWriteErrorReply(context, reply, 451);
+}
+
 static void
 FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply, AsyncCall::Pointer call)
 {
@@ -5250,10 +5667,9 @@ FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply, Asy
     const HttpHeader &header = reply->header;
     ConnStateData *const connState = context->getConn();
 
+    // adaptation and forwarding errors lack HDR_FTP_STATUS
     if (!header.has(HDR_FTP_STATUS)) {
-        // Reply without FTP-Status header may come from ICAP or ACL.
-        connState->ftp.state = ConnStateData::FTP_ERROR;
-        FtpWriteCustomReply(context, 421, reply->sline.reason());
+        FtpWriteForwardedForeign(context, reply);
         return;
     }
 
@@ -5262,14 +5678,43 @@ FtpWriteForwardedReply(ClientSocketContext *context, const HttpReply *reply, Asy
     const int status = header.getInt(HDR_FTP_STATUS);
     debugs(33, 7, HERE << "status: " << status);
 
-    if (status == 150 && connState->ftp.state ==
-        ConnStateData::FTP_HANDLE_UPLOAD_REQUEST)
-        connState->readSomeFtpData();
+    // Status 125 or 150 implies upload or data request, but we still check
+    // the state in case the server is buggy.
+    if ((status == 125 || status == 150) &&
+        (connState->ftp.state == ConnStateData::FTP_HANDLE_UPLOAD_REQUEST ||
+         connState->ftp.state == ConnStateData::FTP_HANDLE_DATA_REQUEST)) {
+        if (FtpCheckDataConnPost(context)) {
+            // If the data connection is ready, start reading data (here)
+            // and forward the response to client (further below).
+            debugs(33, 7, "data connection established, start data transfer");
+            if (connState->ftp.state == ConnStateData::FTP_HANDLE_UPLOAD_REQUEST)
+                connState->readSomeFtpData();
+        } else {
+            // If we are waiting to accept the data connection, keep waiting.
+            if (Comm::IsConnOpen(connState->ftp.dataListenConn)) {
+                debugs(33, 7, "wait for the client to establish a data connection");
+                connState->ftp.onDataAcceptCall = call;
+                // TODO: Add connect timeout for passive connections listener?
+                // TODO: Remember server response so that we can forward it?
+            } else {
+                // Either the connection was establised and closed after the
+                // data was transferred OR we failed to establish an active
+                // data connection and already sent the error to the client.
+                // In either case, there is nothing more to do.
+                debugs(33, 7, "done with data OR active connection failed");
+            }
+            return;
+        }
+    }
 
     MemBuf mb;
     mb.init();
     FtpPrintReply(mb, reply);
 
+    debugs(11, 2, "FTP Client " << context->clientConnection);
+    debugs(11, 2, "FTP Client REPLY:\n---------\n" << mb.buf <<
+           "\n----------");
+
     Comm::Write(context->clientConnection, &mb, call);
 }
 
@@ -5278,23 +5723,19 @@ FtpPrintReply(MemBuf &mb, const HttpReply *reply, const char *const prefix)
 {
     const HttpHeader &header = reply->header;
 
-    char status[4];
-    if (header.has(HDR_FTP_STATUS))
-        snprintf(status, sizeof(status), "%i", header.getInt(HDR_FTP_STATUS));
-    else
-        status[0] = '\0';
-
     HttpHeaderPos pos = HttpHeaderInitPos;
-    const HttpHeaderEntry *e = header.getEntry(&pos);
-    while (e) {
-        const HttpHeaderEntry *const next = header.getEntry(&pos);
-        if (e->id == HDR_FTP_REASON) {
-            const bool isLastLine = next == NULL || next->id != HDR_FTP_REASON;
-            const int separator = status[0] == '\0' || isLastLine ? ' ' : '-';
-            mb.Printf("%s%s%c%s\r\n", prefix, status, separator,
-                      e->value.termedBuf());
+    while (const HttpHeaderEntry *e = header.getEntry(&pos)) {
+        if (e->id == HDR_FTP_PRE) {
+            String raw;
+            if (httpHeaderParseQuotedString(e->value.rawBuf(), e->value.size(), &raw))
+                mb.Printf("%s\r\n", raw.termedBuf());
         }
-        e = next;
+    }
+
+    if (header.has(HDR_FTP_STATUS)) {
+        const char *reason = header.getStr(HDR_FTP_REASON);
+        mb.Printf("%i %s\r\n", header.getInt(HDR_FTP_STATUS),
+                  (reason ? reason : 0));
     }
 }
 
@@ -5311,6 +5752,12 @@ FtpWroteEarlyReply(const Comm::ConnectionPointer &conn, char *bufnotused, size_t
     }
 
     ConnStateData *const connState = static_cast<ConnStateData*>(data);
+    ClientSocketContext::Pointer context = connState->getCurrentContext();
+    if (context != NULL && context->http) {
+        context->http->out.size += size;
+        context->http->out.headers_sz += size;
+    }
+
     connState->flags.readMore = true;
     connState->readSomeData();
 }
@@ -5332,27 +5779,64 @@ FtpWroteReply(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size
         static_cast<ClientSocketContext*>(data);
     ConnStateData *const connState = context->getConn();
 
+    assert(context->http);
+    context->http->out.size += size;
+    context->http->out.headers_sz += size;
+
     if (connState->ftp.state == ConnStateData::FTP_ERROR) {
+        debugs(33, 5, "closing on FTP server error");
         conn->close();
         return;
     }
 
-    assert(context->socketState() == STREAM_COMPLETE);
-    connState->flags.readMore = true;
-    connState->ftp.state = ConnStateData::FTP_CONNECTED;
-    if (connState->in.bodyParser)
-        connState->finishDechunkingRequest(false);
-    context->keepaliveNextRequest();
+    const clientStream_status_t socketState = context->socketState();
+    debugs(33, 5, "FTP client stream state " << socketState);
+    switch (socketState) {
+    case STREAM_UNPLANNED_COMPLETE:
+    case STREAM_FAILED:
+         conn->close();
+         return;
+
+    case STREAM_NONE:
+    case STREAM_COMPLETE:
+        connState->flags.readMore = true;
+        FtpChangeState(connState, ConnStateData::FTP_CONNECTED, "FtpWroteReply");
+        if (connState->in.bodyParser)
+            connState->finishDechunkingRequest(false);
+        context->keepaliveNextRequest();
+        return;
+    }
 }
 
 bool
 FtpHandleRequest(ClientSocketContext *context, String &cmd, String &params) {
+    if (HttpRequest *request = context->http->request) {
+        MemBuf *mb = new MemBuf;
+        Packer p;
+        mb->init();
+        packerToMemInit(&p, mb);
+        request->pack(&p);
+        packerClean(&p);
+
+        debugs(11, 2, "FTP Client " << context->clientConnection);
+        debugs(11, 2, "FTP Client REQUEST:\n---------\n" << mb->buf <<
+               "\n----------");
+        delete mb;
+    }
+
     static std::pair<const char *, FtpRequestHandler *> handlers[] = {
         std::make_pair("LIST", FtpHandleDataRequest),
         std::make_pair("NLST", FtpHandleDataRequest),
+        std::make_pair("MLSD", FtpHandleDataRequest),
+        std::make_pair("FEAT", FtpHandleFeatRequest),
         std::make_pair("PASV", FtpHandlePasvRequest),
         std::make_pair("PORT", FtpHandlePortRequest),
-        std::make_pair("RETR", FtpHandleDataRequest)
+        std::make_pair("RETR", FtpHandleDataRequest),
+        std::make_pair("EPRT", FtpHandleEprtRequest),
+        std::make_pair("EPSV", FtpHandleEpsvRequest),
+        std::make_pair("CWD", FtpHandleCwdRequest),
+        std::make_pair("PASS", FtpHandlePassRequest),
+        std::make_pair("CDUP", FtpHandleCdupRequest),
     };
 
     FtpRequestHandler *handler = NULL;
@@ -5370,6 +5854,8 @@ FtpHandleRequest(ClientSocketContext *context, String &cmd, String &params) {
     return handler != NULL ? (*handler)(context, cmd, params) : true;
 }
 
+/// Called to parse USER command, which is required to create an HTTP request
+/// wrapper. Thus, errors are handled with FtpWriteEarlyReply() here.
 bool
 FtpHandleUserRequest(ConnStateData *connState, const String &cmd, String &params)
 {
@@ -5378,25 +5864,44 @@ FtpHandleUserRequest(ConnStateData *connState, const String &cmd, String &params
         return false;
     }
 
-    const String::size_type eou = params.find('@');
+    const String::size_type eou = params.rfind('@');
     if (eou == String::npos || eou + 1 >= params.size()) {
-        if (connState->ftp.uri.size() > 0)
-            return true;
         FtpWriteEarlyReply(connState, 501, "Missing host");
         return false;
     }
 
-    static const String scheme = "ftp://";
-    const String host = params.substr(eou + 1, params.size());
-    String uri = scheme;
-    uri.append(host);
-    uri.append("/");
+    const String login = params.substr(0, eou);
+    String host = params.substr(eou + 1, params.size());
+    // If we can parse it as raw IPv6 address, then surround with "[]".
+    // Otherwise (domain, IPv4, [bracketed] IPv6, garbage, etc), use as is.
+    if (host.pos(":")) {
+        char ipBuf[MAX_IPSTRLEN];
+        Ip::Address ipa;
+        ipa = host.termedBuf();
+        if (!ipa.isAnyAddr()) {
+            ipa.toHostStr(ipBuf, MAX_IPSTRLEN);
+            host = ipBuf;
+        }
+    }
+    connState->ftp.host = host;
 
-    if (connState->ftp.uri.size() == 0)
-        connState->ftp.uri = uri;
-    else if (uri.caseCmp(connState->ftp.uri) != 0) {
-        FtpWriteEarlyReply(connState, 501, "Cannot change host");
-        return false;
+    String oldUri;
+    if (connState->ftp.readGreeting)
+        oldUri = connState->ftp.uri;
+
+    connState->ftpSetWorkingDir(NULL);
+    connState->ftpBuildUri();
+
+    if (!connState->ftp.readGreeting) {
+        debugs(11, 3, "set URI to " << connState->ftp.uri);
+    } else if (oldUri.caseCmp(connState->ftp.uri) == 0) {
+        debugs(11, 5, "keep URI as " << oldUri);
+    } else {
+        debugs(11, 3, "reset URI from " << oldUri << " to " << connState->ftp.uri);
+        FtpCloseDataConnection(connState);
+        connState->ftp.readGreeting = false;
+        connState->unpinConnection(true); // close control connection to the server
+        FtpChangeState(connState, ConnStateData::FTP_BEGIN, "URI reset");
     }
 
     params.cut(eou);
@@ -5404,33 +5909,110 @@ FtpHandleUserRequest(ConnStateData *connState, const String &cmd, String &params
     return true;
 }
 
+bool
+FtpHandleFeatRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_FEAT, "FtpHandleFeatRequest");
+
+    return true;
+}
+
 bool
 FtpHandlePasvRequest(ClientSocketContext *context, String &cmd, String &params)
 {
+    ConnStateData *const connState = context->getConn();
+    assert(connState);
+    if (connState->ftp.gotEpsvAll) {
+        FtpSetReply(context, 500, "Bad PASV command");
+        return false;
+    }
+
     if (params.size() > 0) {
         FtpSetReply(context, 501, "Unexpected parameter");
         return false;
     }
 
-    context->getConn()->ftp.state = ConnStateData::FTP_HANDLE_PASV;
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_PASV, "FtpHandlePasvRequest");
+    // no need to fake PASV request via FtpSetDataCommand() in true PASV case
+    return true;
+}
+
+/// [Re]initializes dataConn for active data transfers. Does not connect.
+static
+bool FtpCreateDataConnection(ClientSocketContext *context, Ip::Address cltAddr)
+{
+    ConnStateData *const connState = context->getConn();
+    assert(connState);
+    assert(connState->clientConnection != NULL);
+    assert(!connState->clientConnection->remote.isAnyAddr());
+
+    if (cltAddr != connState->clientConnection->remote) {
+        debugs(33, 2, "rogue PORT " << cltAddr << " request? ctrl: " << connState->clientConnection->remote);
+        // Closing the control connection would not help with attacks because
+        // the client is evidently able to connect to us. Besides, closing
+        // makes retrials easier for the client and more damaging to us.
+        FtpSetReply(context, 501, "Prohibited parameter value");
+        return false;
+    }
+
+    FtpCloseDataConnection(context->getConn());
+
+    Comm::ConnectionPointer conn = new Comm::Connection();
+    conn->remote = cltAddr;
 
+    // Use local IP address of the control connection as the source address
+    // of the active data connection, or some clients will refuse to accept.
+    conn->flags |= COMM_DOBIND;
+    conn->local = connState->clientConnection->local;
+    // RFC 959 requires active FTP connections to originate from port 20
+    // but that would preclude us from supporting concurrent transfers! (XXX?)
+    conn->local.port(0);
+
+    debugs(11, 3, "will actively connect from " << conn->local << " to " <<
+           conn->remote);
+
+    context->getConn()->ftp.dataConn = conn;
+    context->getConn()->ftp.uploadAvailSize = 0;
     return true;
 }
 
 bool
 FtpHandlePortRequest(ClientSocketContext *context, String &cmd, String &params)
 {
-    FtpSetReply(context, 502, "Command not supported");
-    return false;
+    // TODO: Should PORT errors trigger FtpCloseDataConnection() cleanup?
+
+    const ConnStateData *connState = context->getConn();
+    if (connState->ftp.gotEpsvAll) {
+        FtpSetReply(context, 500, "Rejecting PORT after EPSV ALL");
+        return false;
+    }
+
+    if (!params.size()) {
+        FtpSetReply(context, 501, "Missing parameter");
+        return false;
+    }
+
+    Ip::Address cltAddr;
+    if (!Ftp::ParseIpPort(params.termedBuf(), NULL, cltAddr)) {
+        FtpSetReply(context, 501, "Invalid parameter");
+        return false;
+    }
+
+    if (!FtpCreateDataConnection(context, cltAddr))
+        return false;
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_PORT, "FtpHandlePortRequest");
+    FtpSetDataCommand(context);
+    return true; // forward our fake PASV request
 }
 
 bool
 FtpHandleDataRequest(ClientSocketContext *context, String &cmd, String &params)
 {
-    if (!FtpCheckDataConnection(context))
+    if (!FtpCheckDataConnPre(context))
         return false;
 
-    context->getConn()->ftp.state = ConnStateData::FTP_HANDLE_DATA_REQUEST;
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_DATA_REQUEST, "FtpHandleDataRequest");
 
     return true;
 }
@@ -5438,26 +6020,174 @@ FtpHandleDataRequest(ClientSocketContext *context, String &cmd, String &params)
 bool
 FtpHandleUploadRequest(ClientSocketContext *context, String &cmd, String &params)
 {
-    if (!FtpCheckDataConnection(context))
+    if (!FtpCheckDataConnPre(context))
         return false;
 
-    context->getConn()->ftp.state = ConnStateData::FTP_HANDLE_UPLOAD_REQUEST;
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_UPLOAD_REQUEST, "FtpHandleDataRequest");
 
     return true;
 }
 
 bool
-FtpCheckDataConnection(ClientSocketContext *context)
+FtpHandleEprtRequest(ClientSocketContext *context, String &cmd, String &params)
 {
-    const ConnStateData *const connState = context->getConn();
+    debugs(11, 3, "Process an EPRT " << params);
+
+    const ConnStateData *connState = context->getConn();
+    if (connState->ftp.gotEpsvAll) {
+        FtpSetReply(context, 500, "Rejecting EPRT after EPSV ALL");
+        return false;
+    }
+
+    if (!params.size()) {
+        FtpSetReply(context, 501, "Missing parameter");
+        return false;
+    }
+
+    Ip::Address cltAddr;
+    if (!Ftp::ParseProtoIpPort(params.termedBuf(), cltAddr)) {
+        FtpSetReply(context, 501, "Invalid parameter");
+        return false;
+    }
+
+    if (!FtpCreateDataConnection(context, cltAddr))
+        return false;
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_EPRT, "FtpHandleEprtRequest");
+    FtpSetDataCommand(context);
+    return true; // forward our fake PASV request
+}
+
+bool
+FtpHandleEpsvRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    debugs(11, 3, "Process an EPSV command with params: " << params);
+    if (params.size() <= 0) {
+        // treat parameterless EPSV as "use the protocol of the ctrl conn"
+    } else if (params.caseCmp("ALL") == 0) {
+        ConnStateData *connState = context->getConn();
+        FtpSetReply(context, 200, "EPSV ALL ok");
+        connState->ftp.gotEpsvAll = true;
+        return false;
+    } else if (params.cmp("2") == 0) {
+        if (!Ip::EnableIpv6) {
+            FtpSetReply(context, 522, "Network protocol not supported, use (1)");
+            return false;
+        }
+    } else if (params.cmp("1") != 0) {
+        FtpSetReply(context, 501, "Unsupported EPSV parameter");
+        return false;
+    }
+
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_EPSV, "FtpHandleEpsvRequest");
+    FtpSetDataCommand(context);
+    return true; // forward our fake PASV request
+}
+
+bool
+FtpHandleCwdRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_CWD, "FtpHandleCwdRequest");
+    return true;
+}
+
+bool
+FtpHandlePassRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_PASS, "FtpHandlePassRequest");
+    return true;
+}
+
+bool
+FtpHandleCdupRequest(ClientSocketContext *context, String &cmd, String &params)
+{
+    FtpChangeState(context->getConn(), ConnStateData::FTP_HANDLE_CDUP, "FtpHandleCdupRequest");
+    return true;
+}
+
+// Convert client PORT, EPRT, PASV, or EPSV data command to Squid PASV command.
+// Squid server-side decides what data command to use on that side.
+void
+FtpSetDataCommand(ClientSocketContext *context)
+{
+    ClientHttpRequest *const http = context->http;
+    assert(http != NULL);
+    HttpRequest *const request = http->request;
+    assert(request != NULL);
+    HttpHeader &header = request->header;
+    header.delById(HDR_FTP_COMMAND);
+    header.putStr(HDR_FTP_COMMAND, "PASV");
+    header.delById(HDR_FTP_ARGUMENTS);
+    header.putStr(HDR_FTP_ARGUMENTS, "");
+    debugs(11, 5, "client data command converted to fake PASV");
+}
+
+/// check that client data connection is ready for future I/O or at least
+/// has a chance of becoming ready soon.
+bool
+FtpCheckDataConnPre(ClientSocketContext *context)
+{
+    ConnStateData *const connState = context->getConn();
     if (Comm::IsConnOpen(connState->ftp.dataConn))
         return true;
 
-    if (!Comm::IsConnOpen(connState->ftp.dataListenConn))
-        FtpSetReply(context, 425, "Use PASV first");
-    else
-        FtpSetReply(context, 425, "Data connection is not established");
-    return false;
+    if (Comm::IsConnOpen(connState->ftp.dataListenConn)) {
+        // We are still waiting for a client to connect to us after PASV.
+        // Perhaps client's data conn handshake has not reached us yet.
+        // After we talk to the server, FtpCheckDataConnPost() will recheck.
+        debugs(33, 3, "expecting clt data conn " << connState->ftp.dataListenConn);
+        return true;
+    }
+
+    if (!connState->ftp.dataConn || connState->ftp.dataConn->remote.isAnyAddr()) {
+        debugs(33, 5, "missing " << connState->ftp.dataConn);
+        // TODO: use client address and default port instead.
+        FtpSetReply(context, 425, "Use PORT or PASV first");
+        return false;
+    }
+
+    // active transfer: open a connection from Squid to client
+    AsyncCall::Pointer connector = context->getConn()->ftp.connector =
+        commCbCall(17, 3, "FtpConnectDoneWrapper", 
+                   CommConnectCbPtrFun(FtpHandleConnectDone, context));
+
+    Comm::ConnOpener *cs = new Comm::ConnOpener(connState->ftp.dataConn,
+                                                connector,
+                                                Config.Timeout.connect);
+    AsyncJob::Start(cs);
+    return false; // ConnStateData::processFtpRequest waits FtpHandleConnectDone
+}
+
+/// Check that client data connection is ready for immediate I/O.
+static bool
+FtpCheckDataConnPost(ClientSocketContext *context)
+{
+    ConnStateData *connState = context->getConn();
+    assert(connState);
+    const Comm::ConnectionPointer &dataConn = connState->ftp.dataConn;
+    if (!Comm::IsConnOpen(dataConn)) {
+        debugs(33, 3, "missing client data conn: " << dataConn);
+        return false;
+    }
+    return true;
+}
+
+void
+FtpHandleConnectDone(const Comm::ConnectionPointer &conn, comm_err_t status, int xerrno, void *data)
+{
+    ClientSocketContext *context = static_cast<ClientSocketContext*>(data);
+    context->getConn()->ftp.connector = NULL;
+
+    if (status != COMM_OK) {
+        conn->close();
+        FtpSetReply(context, 425, "Cannot open data connection.");
+        assert(context->http && context->http->storeEntry() != NULL);
+    } else {
+        assert(context->getConn()->ftp.dataConn == conn);
+        assert(Comm::IsConnOpen(conn));
+        fd_note(conn->fd, "active client ftp data");
+    }
+    context->getConn()->resumeFtpRequest(context);
 }
 
 void
@@ -5481,12 +6211,33 @@ FtpSetReply(ClientSocketContext *context, const int code, const char *msg)
     header.putStr(HDR_FTP_REASON, msg);
     reply->hdrCacheInit();
 
-    setLogUri(http, http->uri,  true);
+    setLogUri(http, urlCanonicalClean(http->request));
 
     clientStreamNode *const node = context->getClientReplyContext();
     clientReplyContext *const repContext =
         dynamic_cast<clientReplyContext *>(node->data.getRaw());
     assert(repContext != NULL);
-    repContext->createStoreEntry(http->request->method, RequestFlags());
+
+    RequestFlags flags;
+    flags.cachable = false; // force releaseRequest() in storeCreateEntry()
+    flags.noCache = true;
+    repContext->createStoreEntry(http->request->method, flags);
     http->storeEntry()->replaceHttpReply(reply);
 }
+
+/// Whether Squid FTP gateway supports a given feature (e.g., a command).
+static bool
+FtpSupportedCommand(const String &name)
+{
+    static std::set<std::string> BlackList;
+    if (BlackList.empty()) {
+        /* Add FTP commands that Squid cannot gateway correctly */
+
+        // we probably do not support AUTH TLS.* and AUTH SSL,
+        // but let's disclaim all AUTH support to KISS, for now
+        BlackList.insert("AUTH");
+    }
+
+    // we claim support for all commands that we do not know about
+    return BlackList.find(name.termedBuf()) == BlackList.end();
+}