{
public:
- Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(nullptr), delayedLoops(0),
- dirty(false),
- readPending(nullptr), readPendingFunc(nullptr) {}
-
+ explicit Connection(const char *aSide);
~Connection();
/// initiates Comm::Connection ownership, including closure monitoring
/// reacts to the external closure of our connection
void noteClosure();
+ /// reacts to a successful zero-size read(2)
+ void noteEof();
+
int bytesWanted(int lower=0, int upper = INT_MAX) const;
void bytesIn(int const &);
#if USE_DELAY_POOLS
/// writes 'b' buffer, setting the 'writer' member to 'callback'.
void write(const char *b, int size, AsyncCall::Pointer &callback, FREE * free_func);
int len;
+
+ /// The role of the agent we are communicating with.
+ /// This string literal is only used for debugging.
+ const char * const side;
+
char *buf;
AsyncCall::Pointer writer; ///< pending Comm::Write callback
uint64_t *size_ptr; /* pointer to size in an ConnStateData for logging */
bool dirty; ///< whether write() has been called (at least once)
+ bool receivedEof = false; ///< whether read() has returned zero bytes
+
// XXX: make these an AsyncCall when event API can handle them
TunnelStateData *readPending;
EVH *readPendingFunc;
/// Measures time spent on selecting and communicating with peers.
PeeringActivityTimer peeringTimer;
- void copyRead(Connection &from, IOCB *completion);
+ void copyRead(Connection &from, Connection &to, IOCB *completion);
/// continue to set up connection to a peer, going async for SSL peers
void connectToPeer(const Comm::ConnectionPointer &);
static EVH tunnelDelayedClientRead;
static EVH tunnelDelayedServerRead;
+static std::ostream &
+operator <<(std::ostream &os, const TunnelStateData::Connection &c)
+{
+ os << '{';
+ os << c.side;
+
+ if (c.conn)
+ os << ' ' << c.conn->id;
+
+ if (c.len)
+ os << " buf=" << c.len;
+
+ if (c.writer)
+ os << " writing";
+ else if (!c.dirty)
+ os << " clean";
+
+ if (c.delayedLoops)
+ os << " delayedLoops=" << c.delayedLoops;
+ if (c.readPending)
+ os << " delaying";
+
+ if (c.receivedEof)
+ os << " rEOF";
+
+ os << '}';
+ return os;
+}
+
/// TunnelStateData::serverClosed() wrapper
static void
tunnelServerClosed(const CommCloseCbParams ¶ms)
// move will unnecessary delay deleteThis().
if (remainingConnection.writer) {
- debugs(26, 5, "waiting to finish writing to " << remainingConnection.conn);
+ debugs(26, 5, "waiting to finish writing to " << remainingConnection);
// the write completion callback must close its remainingConnection
// after noticing that the other connection is gone
return;
}
TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
+ client("client"),
+ server("server"),
startTime(squid_curtime),
destinations(new ResolvedPeers()),
destinationsFound(false),
delete savedError;
}
+TunnelStateData::Connection::Connection(const char * const aSide):
+ len(0),
+ side(aSide),
+ buf(static_cast<char *>(xmalloc(SQUID_TCP_SO_RCVBUF))),
+ size_ptr(nullptr),
+ delayedLoops(0),
+ dirty(false),
+ readPending(nullptr),
+ readPendingFunc(nullptr)
+{
+}
+
TunnelStateData::Connection::~Connection()
{
if (readPending)
void
TunnelStateData::readServer(char *, size_t len, Comm::Flag errcode, int xerrno)
{
- debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode);
+ debugs(26, 3, server << ", read " << len << " bytes, err=" << errcode);
server.delayedLoops=0;
/*
void
TunnelStateData::Connection::error(int const xerrno)
{
- debugs(50, debugLevelForError(xerrno), conn << ": read/write failure: " << xstrerr(xerrno));
+ debugs(50, debugLevelForError(xerrno), *this << ": read/write failure: " << xstrerr(xerrno));
if (!ignoreErrno(xerrno))
conn->close();
void
TunnelStateData::readClient(char *, size_t len, Comm::Flag errcode, int xerrno)
{
- debugs(26, 3, client.conn << ", read " << len << " bytes, err=" << errcode);
+ debugs(26, 3, client << ", read " << len << " bytes, err=" << errcode);
client.delayedLoops=0;
/*
bool
TunnelStateData::keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to)
{
- debugs(26, 3, "from={" << from.conn << "}, to={" << to.conn << "}");
+ debugs(26, 3, "from=" << from << "; writing to=" << to);
/* I think this is to prevent free-while-in-a-callback behaviour
* - RBC 20030229
commSetConnTimeout(to.conn, Config.Timeout.read, timeoutCall);
}
- if (errcode)
+ if (errcode) {
from.error (xerrno);
- else if (len == 0 || !Comm::IsConnOpen(to.conn)) {
- debugs(26, 3, "Nothing to write or client gone. Terminate the tunnel.");
+ return false;
+ }
+
+ if (len == 0) {
+ debugs(26, 3, "closing " << from << " after a zero-byte read");
from.conn->close();
/* Only close the remote end if we've finished queueing data to it */
if (from.len == 0 && Comm::IsConnOpen(to.conn) ) {
to.conn->close();
}
- } else if (cbdataReferenceValid(this)) {
- return true;
+ return false;
+ }
+
+ // Stop reading from source if the destination is gone. This both increases
+ // `from` chances to realize what happened at the `to` end and terminates an
+ // otherwise potentially infinite stream of incoming `from` bytes.
+ if (!Comm::IsConnOpen(to.conn)) {
+ debugs(26, 3, "closing " << from << " because " << to << " is gone");
+ from.conn->close();
+ return false;
}
- return false;
+ if (!cbdataReferenceValid(this))
+ return false;
+
+ Assure(len > 0);
+ return true;
}
void
void
TunnelStateData::writeServerDone(char *, size_t len, Comm::Flag flag, int xerrno)
{
- debugs(26, 3, server.conn << ", " << len << " bytes written, flag=" << flag);
+ debugs(26, 3, server << ", " << len << " bytes written, flag=" << flag);
if (flag == Comm::ERR_CLOSING)
return;
void
TunnelStateData::Connection::initConnection(const Comm::ConnectionPointer &aConn, Method method, const char *name, TunnelStateData *tunnelState)
{
+ debugs(26, 3, *this << " uses " << aConn);
Must(!Comm::IsConnOpen(conn));
Must(!closer);
Must(Comm::IsConnOpen(aConn));
void
TunnelStateData::Connection::noteClosure()
{
- debugs(26, 3, conn);
+ debugs(26, 3, *this);
conn = nullptr;
closer = nullptr;
writer = nullptr; // may already be nil
}
+void
+TunnelStateData::Connection::noteEof()
+{
+ debugs(26, 3, "from " << *this);
+ receivedEof = true;
+}
+
void
TunnelStateData::writeClientDone(char *, size_t len, Comm::Flag flag, int xerrno)
{
- debugs(26, 3, client.conn << ", " << len << " bytes written, flag=" << flag);
+ debugs(26, 3, client << ", " << len << " bytes written, flag=" << flag);
if (flag == Comm::ERR_CLOSING)
return;
/* Error? */
if (flag != Comm::OK) {
- debugs(26, 4, "from-client read failed: " << xerrno);
+ debugs(26, 4, "to-client write failed: " << xerrno);
client.error(xerrno); // may call comm_close
return;
}
tunnel->client.readPending = nullptr;
static uint64_t counter=0;
debugs(26, 7, "Client read(2) delayed " << ++counter << " times");
- tunnel->copyRead(tunnel->client, TunnelStateData::ReadClient);
+ tunnel->copyRead(tunnel->client, tunnel->server, TunnelStateData::ReadClient);
CodeContext::Reset(savedContext);
}
tunnel->server.readPending = nullptr;
static uint64_t counter=0;
debugs(26, 7, "Server read(2) delayed " << ++counter << " times");
- tunnel->copyRead(tunnel->server, TunnelStateData::ReadServer);
+ tunnel->copyRead(tunnel->server, tunnel->client, TunnelStateData::ReadServer);
CodeContext::Reset(savedContext);
}
void
-TunnelStateData::copyRead(Connection &from, IOCB *completion)
+TunnelStateData::copyRead(Connection &from, Connection &to, IOCB * const completion)
{
+ debugs(26, 5, "from=" << from << "; writing to=" << to);
+
assert(from.len == 0);
// If only the minimum permitted read size is going to be attempted
// then we schedule an event to try again in a few I/O cycles.
TunnelStateData::copyClientBytes()
{
if (preReadClientData.length()) {
+ debugs(26, 7, "pre-read bytes: " << preReadClientData.length());
size_t copyBytes = preReadClientData.length() > SQUID_TCP_SO_RCVBUF ? SQUID_TCP_SO_RCVBUF : preReadClientData.length();
memcpy(client.buf, preReadClientData.rawContent(), copyBytes);
preReadClientData.consume(copyBytes);
if (keepGoingAfterRead(copyBytes, Comm::OK, 0, client, server))
copy(copyBytes, client, server, TunnelStateData::WriteServerDone);
} else
- copyRead(client, ReadClient);
+ copyRead(client, server, ReadClient);
}
void
TunnelStateData::copyServerBytes()
{
if (preReadServerData.length()) {
+ debugs(26, 7, "pre-read bytes: " << preReadServerData.length());
size_t copyBytes = preReadServerData.length() > SQUID_TCP_SO_RCVBUF ? SQUID_TCP_SO_RCVBUF : preReadServerData.length();
memcpy(server.buf, preReadServerData.rawContent(), copyBytes);
preReadServerData.consume(copyBytes);
if (keepGoingAfterRead(copyBytes, Comm::OK, 0, server, client))
copy(copyBytes, server, client, TunnelStateData::WriteClientDone);
} else
- copyRead(server, ReadServer);
+ copyRead(server, client, ReadServer);
}
/**
tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
- debugs(26, 3, conn << ", flag=" << flag);
+ debugs(26, 3, tunnelState->client << ", flag=" << flag);
tunnelState->client.writer = nullptr;
if (flag != Comm::OK) {