return s_threadInfos.at(t_id).isHandler;
}
-#if 0
-#define TCPLOG(x) cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << ' ' << x
+#if 1
+#define TCPLOG(tcpsock, x) do { cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << " FD " << (tcpsock) << ' ' << x; } while (0)
#else
-#define TCPLOG(x)
+#define TCPLOG(pid, x)
#endif
static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
{
- TCPLOG("asendtcp called " << data.size() << endl);
+ TCPLOG(handler->getDescriptor(), "asendtcp called " << data.size() << endl);
auto pident = std::make_shared<PacketID>();
pident->tcphandler = handler;
pident->outMSG = data;
pident->highState = TCPAction::DoingWrite;
-
IOState state;
try {
- TCPLOG("Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
+ TCPLOG(pident->tcpsock, "Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
- TCPLOG(pident->outPos << '/' << pident->outMSG.size() << endl);
+ TCPLOG(pident->tcpsock, pident->outPos << '/' << pident->outMSG.size() << endl);
if (state == IOState::Done) {
- TCPLOG("asendtcp success A" << endl);
+ TCPLOG(pident->tcpsock, "asendtcp success A" << endl);
return LWResult::Result::Success;
}
}
catch (const std::exception& e) {
- TCPLOG("tryWrite() exception..." << e.what() << endl);
+ TCPLOG(pident->tcpsock, "tryWrite() exception..." << e.what() << endl);
return LWResult::Result::PermanentError;
}
PacketBuffer packet;
int ret = MT->waitEvent(*pident, &packet, g_networkTimeoutMsec);
- TCPLOG("asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
+ TCPLOG(pident->tcpsock, "asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
if (ret == 0) {
- TCPLOG("timeout" << endl);
+ TCPLOG(pident->tcpsock, "timeout" << endl);
TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
return LWResult::Result::Timeout;
}
else if (ret == -1) { // error
- TCPLOG("PermanentError" << endl);
+ TCPLOG(pident->tcpsock, "PermanentError" << endl);
TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
return LWResult::Result::PermanentError;
}
else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
// fd housekeeping done by TCPIOHandlerIO
- TCPLOG("PermanentError size mismatch" << endl);
+ TCPLOG(pident->tcpsock, "PermanentError size mismatch" << endl);
return LWResult::Result::PermanentError;
}
- TCPLOG("asendtcp success" << endl);
+ TCPLOG(pident->tcpsock, "asendtcp success" << endl);
return LWResult::Result::Success;
}
LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
{
- TCPLOG("arecvtcp called " << len << ' ' << data.size() << endl);
+ TCPLOG(handler->getDescriptor(), "arecvtcp called " << len << ' ' << data.size() << endl);
data.resize(len);
// We might have data already available from the TLS layer, try to get that into the buffer
size_t pos = 0;
IOState state;
try {
- TCPLOG("calling tryRead() " << len << endl);
+ TCPLOG(handler->getDescriptor(), "calling tryRead() " << len << endl);
state = handler->tryRead(data, pos, len);
- TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
+ TCPLOG(handler->getDescriptor(), "arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
switch (state) {
case IOState::Done:
case IOState::NeedRead:
if (pos == len || (incompleteOkay && pos > 0)) {
data.resize(pos);
- TCPLOG("acecvtcp success A" << endl);
+ TCPLOG(handler->getDescriptor(), "acecvtcp success A" << endl);
return LWResult::Result::Success;
}
break;
}
}
catch (const std::exception& e) {
- TCPLOG("tryRead() exception..." << e.what() << endl);
+ TCPLOG(handler->getDescriptor(), "tryRead() exception..." << e.what() << endl);
return LWResult::Result::PermanentError;
}
TCPIOHandlerStateChange(IOState::Done, state, pident);
int ret = MT->waitEvent(*pident, &data, g_networkTimeoutMsec);
- TCPLOG("arecvtcp " << ret << ' ' << data.size() << ' ' );
+ TCPLOG(pident->tcpsock, "arecvtcp " << ret << ' ' << data.size() << ' ' );
if (ret == 0) {
- TCPLOG("timeout" << endl);
+ TCPLOG(pident->tcpsock, "timeout" << endl);
TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
return LWResult::Result::Timeout;
}
else if (ret == -1) {
- TCPLOG("PermanentError" << endl);
+ TCPLOG(pident->tcpsock, "PermanentError" << endl);
TCPIOHandlerStateChange(pident->lowState, IOState::Done, pident);
return LWResult::Result::PermanentError;
}
else if (data.empty()) {// error, EOF or other
// fd housekeeping done by TCPIOHandlerIO
- TCPLOG("EOF" << endl);
+ TCPLOG(pident->tcpsock, "EOF" << endl);
return LWResult::Result::PermanentError;
}
- TCPLOG("arecvtcp success" << endl);
+ TCPLOG(pident->tcpsock, "arecvtcp success" << endl);
return LWResult::Result::Success;
}
static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
{
- TCPLOG("State transation " << int(oldstate) << "->" << int(newstate) << endl);
+ TCPLOG(pid->tcpsock, "State transation " << int(oldstate) << "->" << int(newstate) << endl);
pid->lowState = newstate;
switch (newstate) {
case IOState::NeedWrite:
- TCPLOG("NeedRead -> NeedWrite: flip FD" << endl);
+ TCPLOG(pid->tcpsock, "NeedRead -> NeedWrite: flip FD" << endl);
t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
break;
case IOState::NeedRead:
break;
case IOState::Done:
- TCPLOG("Done -> removeReadFD" << endl);
+ TCPLOG(pid->tcpsock, "Done -> removeReadFD" << endl);
t_fdm->removeReadFD(pid->tcpsock);
break;
}
switch (newstate) {
case IOState::NeedRead:
- TCPLOG("NeedWrite -> NeedRead: flip FD" << endl);
+ TCPLOG(pid->tcpsock, "NeedWrite -> NeedRead: flip FD" << endl);
t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
break;
case IOState::NeedWrite:
break;
case IOState::Done:
- TCPLOG("Done -> removeWriteFD" << endl);
+ TCPLOG(pid->tcpsock, "Done -> removeWriteFD" << endl);
t_fdm->removeWriteFD(pid->tcpsock);
break;
}
case IOState::Done:
switch (newstate) {
case IOState::NeedRead:
- TCPLOG("NeedRead: addReadFD" << endl);
+ TCPLOG(pid->tcpsock, "NeedRead: addReadFD" << endl);
t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
break;
case IOState::NeedWrite:
- TCPLOG("NeedWrite: addWriteFD" << endl);
+ TCPLOG(pid->tcpsock, "NeedWrite: addWriteFD" << endl);
t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
break;
case IOState::Done:
assert(fd == pid->tcphandler->getDescriptor());
IOState newstate = IOState::Done;
- TCPLOG("TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
+ TCPLOG(pid->tcpsock, "TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
// In the code below, we want to update the state of the fd before calling sendEvent
// a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
switch (pid->highState) {
case TCPAction::DoingRead:
- TCPLOG("highState: Reading" << endl);
+ TCPLOG(pid->tcpsock, "highState: Reading" << endl);
// In arecvtcp, the buffer was resized already so inWanted bytes will fit
// try reading
try {
switch (newstate) {
case IOState::Done:
case IOState::NeedRead:
- TCPLOG("tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
- TCPLOG("TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
+ TCPLOG(pid->tcpsock, "tryRead: Done or NeedRead " << int(newstate) << ' ' << pid->inPos << '/' << pid->inWanted << endl);
+ TCPLOG(pid->tcpsock, "TCPIOHandlerIO " << pid->inWanted << ' ' << pid->inIncompleteOkay << endl);
if (pid->inPos == pid->inWanted || (pid->inIncompleteOkay && pid->inPos > 0)) {
pid->inMSG.resize(pid->inPos); // old content (if there) + new bytes read, only relevant for the inIncompleteOkay case
newstate = IOState::Done;
}
catch (const std::exception& e) {
newstate = IOState::Done;
- TCPLOG("read exception..." << e.what() << endl);
+ TCPLOG(pid->tcpsock, "read exception..." << e.what() << endl);
PacketBuffer empty;
TCPIOHandlerStateChange(pid->lowState, newstate, pid);
MT->sendEvent(*pid, &empty); // this conveys error status
break;
case TCPAction::DoingWrite:
- TCPLOG("highState: Writing" << endl);
+ TCPLOG(pid->tcpsock, "highState: Writing" << endl);
try {
- TCPLOG("tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << pid << " -> ");
+ TCPLOG(pid->tcpsock, "tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << pid << " -> ");
newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
- TCPLOG(pid->outPos << '/' << pid->outMSG.size() << endl);
+ TCPLOG(pid->tcpsock, pid->outPos << '/' << pid->outMSG.size() << endl);
switch (newstate) {
case IOState::Done: {
- TCPLOG("tryWrite: Done" << endl);
+ TCPLOG(pid->tcpsock, "tryWrite: Done" << endl);
TCPIOHandlerStateChange(pid->lowState, newstate, pid);
MT->sendEvent(*pid, &pid->outMSG); // send back what we sent to convey everything is ok
return;
}
case IOState::NeedRead:
- TCPLOG("tryWrite: NeedRead" << endl);
+ TCPLOG(pid->tcpsock, "tryWrite: NeedRead" << endl);
break;
case IOState::NeedWrite:
- TCPLOG("tryWrite: NeedWrite" << endl);
+ TCPLOG(pid->tcpsock, "tryWrite: NeedWrite" << endl);
break;
}
}
catch (const std::exception& e) {
newstate = IOState::Done;
- TCPLOG("write exception..." << e.what() << endl);
+ TCPLOG(pid->tcpsock, "write exception..." << e.what() << endl);
PacketBuffer sent;
TCPIOHandlerStateChange(pid->lowState, newstate, pid);
MT->sendEvent(*pid, &sent); // we convey error status by sending empty string