{
requests.push_back(c);
++nrequests;
- ++nactive;
debugs(33, 3, "Pipeline " << (void*)this << " add request " << nrequests << ' ' << c);
}
}
void
-Pipeline::popById(uint32_t which)
+Pipeline::popMe(const Http::StreamContextPointer &which)
{
if (requests.empty())
return;
- debugs(33, 3, "Pipeline " << (void*)this << " drop id=" << which);
-
- // find the context and clear its Pointer
- for (auto &&i : requests) {
- if (i->id == which) {
- i = nullptr;
- --nactive;
- break;
- }
- }
-
- // trim closed contexts from the list head (if any)
- while (!requests.empty() && !requests.front())
- requests.pop_front();
+ debugs(33, 3, "Pipeline " << (void*)this << " drop " << requests.front());
+ // in reality there may be multiple contexts doing processing in parallel.
+ // XXX: pipeline still assumes HTTP/1 FIFO semantics are obeyed.
+ assert(which == requests.front());
+ requests.pop_front();
}
Pipeline & operator =(const Pipeline &) = delete;
public:
- Pipeline() : nrequests(0), nactive(0) {}
+ Pipeline() : nrequests(0) {}
~Pipeline() = default;
/// register a new request context to the pipeline
Http::StreamContextPointer front() const;
/// how many requests are currently pipelined
- size_t count() const {return nactive;}
+ size_t count() const {return requests.size();}
/// whether there are none or any requests currently pipelined
bool empty() const {return requests.empty();}
/// tell everybody about the err, and abort all waiting requests
void terminateAll(const int xerrno);
- /// deregister a request from the pipeline
- void popById(uint32_t);
+ /// deregister the front request from the pipeline
+ void popMe(const Http::StreamContextPointer &);
/// Number of requests seen in this pipeline (so far).
/// Includes incomplete transactions.
private:
/// requests parsed from the connection but not yet completed.
std::list<Http::StreamContextPointer> requests;
-
- /// Number of still-active streams in this pipeline (so far).
- /// Includes incomplete transactions.
- uint32_t nactive;
};
#endif /* SQUID_SRC_PIPELINE_H */
http->req_sz = inBuf.length();
http->uri = xstrdup(uri);
setLogUri (http, uri);
- auto *context = new Http::StreamContext(nextStreamId(), clientConnection, http);
+ auto *context = new Http::StreamContext(clientConnection, http);
StoreIOBuffer tempBuffer;
tempBuffer.data = context->reqbuf;
tempBuffer.length = HTTP_REQBUF_SZ;
ClientHttpRequest *http = new ClientHttpRequest(csd);
http->req_sz = hp->messageHeaderSize();
- Http::StreamContext *result = new Http::StreamContext(csd->nextStreamId(), csd->clientConnection, http);
+ Http::StreamContext *result = new Http::StreamContext(csd->clientConnection, http);
StoreIOBuffer tempBuffer;
tempBuffer.data = result->reqbuf;
// XXX: Either the context is finished() or it should stay queued.
// The below may leak client streams BodyPipe objects. BUT, we need
// to check if client-streams detatch is safe to do here (finished() will detatch).
- conn->pipeline.popById(context->id);
+ assert(conn->pipeline.front() == context); // XXX: still assumes HTTP/1 semantics
+ conn->pipeline.popMe(Http::StreamContextPointer(context));
}
Comm::SetSelect(conn->clientConnection->fd, COMM_SELECT_READ, NULL, NULL, 0);
conn->fakeAConnectRequest("unknown-protocol", conn->preservedClientData);
virtual ~ConnStateData();
/* ::Server API */
- virtual uint32_t nextStreamId() {return ++nextStreamId_;}
virtual void receivedFirstByte();
virtual bool handleReadData();
virtual void afterClientRead();
#include "Store.h"
#include "TimeOrTag.h"
-Http::StreamContext::StreamContext(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
- id(anId),
+Http::StreamContext::StreamContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
clientConnection(aConn),
http(aReq),
reply(nullptr),
assert(connRegistered_);
connRegistered_ = false;
- conn->pipeline.popById(id);
+ conn->pipeline.popMe(Http::StreamContextPointer(this));
}
/// called when we encounter a response-related error
public:
/// construct with HTTP/1.x details
- StreamContext(uint32_t id, const Comm::ConnectionPointer &, ClientHttpRequest *);
+ StreamContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
~StreamContext();
/// register this stream with the Server
void deferRecipientForLater(clientStreamNode *, HttpReply *, StoreIOBuffer receivedData);
-public:
- // NP: stream ID is relative to the connection, not global.
- uint32_t id; ///< stream ID within the client connection.
-
public: // HTTP/1.x state data
Comm::ConnectionPointer clientConnection; ///< details about the client connection socket
http->uri = newUri;
Http::StreamContext *const result =
- new Http::StreamContext(nextStreamId(), clientConnection, http);
+ new Http::StreamContext(clientConnection, http);
StoreIOBuffer tempBuffer;
tempBuffer.data = result->reqbuf;
clientConnection(xact->tcpClient),
transferProtocol(xact->squidPort->transport),
port(xact->squidPort),
- receivedFirstByte_(false),
- nextStreamId_(0)
+ receivedFirstByte_(false)
{}
bool
virtual bool doneAll() const;
virtual void swanSong();
- /// fetch the next available stream ID
- virtual uint32_t nextStreamId() = 0;
-
/// ??
virtual bool connFinishedWithConn(int size) = 0;
void doClientRead(const CommIoCbParams &io);
void clientWriteDone(const CommIoCbParams &io);
- uint32_t nextStreamId_; ///< incremented as streams are initiated
AsyncCall::Pointer reader; ///< set when we are reading
AsyncCall::Pointer writer; ///< set when we are writing
};