}
#if 0
-#define TCPLOG(x) cerr << x
+#define TCPLOG(x) cerr << [](){ timeval t; gettimeofday(&t, nullptr); return t.tv_sec % 10 + t.tv_usec/1000000.0; }() << ' ' << x
#else
#define TCPLOG(x)
#endif
-static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var);
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var);
+static void TCPIOHandlerStateChange(IOState, IOState, std::shared_ptr<PacketID>&);
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
{
TCPLOG("asendtcp called " << data.size() << endl);
- PacketID pident;
- pident.tcphandler = handler;
- pident.tcpsock = handler->getDescriptor();
- pident.outMSG = data;
+ auto pident = std::make_shared<PacketID>();
+ pident->tcphandler = handler;
+ pident->tcpsock = handler->getDescriptor();
+ pident->outMSG = data;
+ pident->highState = TCPAction::DoingWrite;
- t_fdm->addWriteFD(handler->getDescriptor(), TCPIOHandlerWritable, pident);
- PacketBuffer packet;
- int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
+ TCPLOG("Initial tryWrite: " << pident->outPos << '/' << pident->outMSG.size() << ' ' << " -> ");
+ IOState state = handler->tryWrite(pident->outMSG, pident->outPos, pident->outMSG.size());
+ TCPLOG(pident->outPos << '/' << pident->outMSG.size() << endl);
+
+ if (state == IOState::Done) {
+ TCPLOG("asendtcp success A" << endl);
+ return LWResult::Result::Success;
+ }
+
+ // Will set pident->lowState
+ TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+ PacketBuffer packet;
+ int ret = MT->waitEvent(*pident, &packet, g_networkTimeoutMsec);
TCPLOG("asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
if (ret == 0) {
t_fdm->removeWriteFD(handler->getDescriptor());
return LWResult::Result::Success;
}
-static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var);
-
LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
{
TCPLOG("arecvtcp called " << len << ' ' << data.size() << endl);
- size_t pos = 0;
data.resize(len);
- TCPLOG("calling tryRead() " << data.size() << ' ' << len << endl);
- /* IOState state = */ handler->tryRead(data, pos, len);
- TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
- if (pos == len || (incompleteOkay && pos > 0)) {
- data.resize(pos);
- TCPLOG("acecvtcp success A" << endl);
- return LWResult::Result::Success;
+ // We might have data already available from the TLS layer, try to get that into the buffer
+ size_t pos = 0;
+ IOState state;
+ try {
+ TCPLOG("calling tryRead() " << len << endl);
+ state = handler->tryRead(data, pos, len);
+ TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
+ switch (state) {
+ case IOState::Done:
+ case IOState::NeedRead:
+ if (pos == len || (incompleteOkay && pos > 0)) {
+ data.resize(pos);
+ TCPLOG("acecvtcp success A" << endl);
+ return LWResult::Result::Success;
+ }
+ break;
+ case IOState::NeedWrite:
+ break;
+ }
+ }
+ catch (const std::exception& e) {
+ TCPLOG("tryRead() exception..." << e.what() << endl);
+ return LWResult::Result::PermanentError;
}
+ auto pident = std::make_shared<PacketID>();
+ pident->tcphandler = handler;
+ pident->tcpsock = handler->getDescriptor();
+ // We might have a partial result
+ data.resize(pos);
+ pident->inMSG = data;
+ pident->inNeeded = len;
+ pident->inIncompleteOkay = incompleteOkay;
+ pident->highState = TCPAction::DoingRead;
+
data.clear();
- PacketID pident;
- pident.tcphandler = handler;
- pident.tcpsock = handler->getDescriptor();
- pident.inNeeded = len;
- pident.inIncompleteOkay = incompleteOkay;
- t_fdm->addReadFD(handler->getDescriptor(), TCPIOHandlerReadable, pident);
-
- int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
- TCPLOG("arecvtcp" << ret << ' ' << data.size() << ' ');
+
+ // Will set pident->lowState
+ TCPIOHandlerStateChange(IOState::Done, state, pident);
+
+ int ret = MT->waitEvent(*pident, &data, g_networkTimeoutMsec);
+ TCPLOG("arecvtcp " << ret << ' ' << data.size() << ' ' );
if (ret == 0) {
TCPLOG("timeout" << endl);
t_fdm->removeReadFD(handler->getDescriptor());
// cerr<<"Had some kind of error: "<<ret<<", "<<stringerror()<<endl;
}
}
+
PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query)
{
Socket s(dest.sin4.sin_family, SOCK_DGRAM);
s.setNonBlocking();
ComboAddress local = pdns::getQueryLocalAddress(dest.sin4.sin_family, 0);
-
+
s.bind(local);
s.connect(dest);
s.send(query);
t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
PacketBuffer data;
-
int ret=MT->waitEvent(pident, &data, g_networkTimeoutMsec);
-
+
if(!ret || ret==-1) { // timeout
t_fdm->removeReadFD(s.getHandle());
}
}
}
-static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var)
+static void TCPIOHandlerStateChange(IOState oldstate, IOState newstate, std::shared_ptr<PacketID>& pid)
{
- PacketID* pident=boost::any_cast<PacketID>(&var);
- assert(pident->tcphandler);
- assert(fd == pident->tcphandler->getDescriptor());
+ TCPLOG("State transation " << int(oldstate) << "->" << int(newstate) << endl);
- TCPLOG("TCPIOHandlerReadable" << endl);
- try {
- size_t pos = pident->inMSG.size();
- pident->inMSG.resize(pos + pident->inNeeded); // make room for what we'll read
- IOState state = pident->tcphandler->tryRead(pident->inMSG, pos, pident->inNeeded);
+ pid->lowState = newstate;
- switch (state) {
+ // handle state transitions
+ switch (oldstate) {
+ case IOState::NeedRead:
+
+ switch (newstate) {
+ case IOState::NeedWrite:
+ TCPLOG("NeedRead -> NeedWrite: flip FD" << endl);
+ t_fdm->alterFDToWrite(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::NeedRead:
+ break;
case IOState::Done:
+ TCPLOG("Done -> removeReadFD" << endl);
+ t_fdm->removeReadFD(pid->tcpsock);
+ break;
+ }
+ break;
+
+ case IOState::NeedWrite:
+
+ switch (newstate) {
case IOState::NeedRead:
- TCPLOG("TCPIOHandlerReadable state Done or Read " << int(state) << ' ' << buffer.size() << " bytes " << pident->inNeeded << '/' << pos << endl);
- pident->inMSG.resize(pos); // old content (if there) + new bytes read
- pident->inNeeded -= pos;
- TCPLOG("TCPIOHandlerReadable " << pident->inNeeded << ' ' << pident->inIncompleteOkay << endl);
- if (pident->inNeeded == 0 || pident->inIncompleteOkay) {
- // removeReadFD seems to clobber PacketID, so take a copy
- PacketID pid = *pident;
- t_fdm->removeReadFD(fd);
- MT->sendEvent(pid, &pid.inMSG);
- break;
- }
- TCPLOG("TCPIOHandlerReadable more? flip to write seems needed???..." << endl);
- t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident);
+ TCPLOG("NeedWrite -> NeedRead: flip FD" << endl);
+ t_fdm->alterFDToRead(pid->tcpsock, TCPIOHandlerIO, pid);
break;
case IOState::NeedWrite:
- TCPLOG("NeedWrite... flip FD" << endl);
- t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident);
+ break;
+ case IOState::Done:
+ TCPLOG("Done -> removeWriteFD" << endl);
+ t_fdm->removeWriteFD(pid->tcpsock);
break;
}
+ break;
+
+ case IOState::Done:
+ switch (newstate) {
+ case IOState::NeedRead:
+ TCPLOG("NeedRead: addReadFD" << endl);
+ t_fdm->addReadFD(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::NeedWrite:
+ TCPLOG("NeedWrite: addWriteFD" << endl);
+ t_fdm->addWriteFD(pid->tcpsock, TCPIOHandlerIO, pid);
+ break;
+ case IOState::Done:
+ break;
+ }
+ break;
}
- catch (const std::exception& e) {
- TCPLOG("TCPIOHandlerReadble exception..." << e.what() << endl);
- PacketID tmp = *pident;
- t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
- PacketBuffer empty;
- MT->sendEvent(tmp, &empty); // this conveys error status
- }
+
}
-static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var)
+static void TCPIOHandlerIO(int fd, FDMultiplexer::funcparam_t& var)
{
- PacketID* pid = boost::any_cast<PacketID>(&var);
+ auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
assert(pid->tcphandler);
assert(fd == pid->tcphandler->getDescriptor());
+ IOState newstate = IOState::Done;
- TCPLOG("TCPIOHandlerWritable " << pid->outPos << '/' << pid->outMSG.size() << endl);
- try {
- IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
- switch (state) {
- case IOState::Done: {
- TCPLOG("TCPIOHandlerWritable Done" << endl);
- // removeWriteFD seems to clobber PacketID, so take a copy
- PacketID tmp = *pid;
- t_fdm->removeWriteFD(fd);
- MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
- break;
+ TCPLOG("TCPIOHandlerIO: lowState " << int(pid->lowState) << endl);
+
+ // In the code below, we want to update the state of the fd before calling sendEvent
+ // a sendEvent might close the fd, and some poll multiplexers do not like to manipulate a closed fd
+
+ switch (pid->highState) {
+ case TCPAction::DoingRead:
+ TCPLOG("highState: Reading" << endl);
+ // try reading
+ try {
+ size_t pos = pid->inMSG.size();
+ pid->inMSG.resize(pos + pid->inNeeded); // make room for what we'll read
+ newstate = pid->tcphandler->tryRead(pid->inMSG, pos, pid->inNeeded);
+ switch (newstate) {
+ case IOState::Done:
+ case IOState::NeedRead:
+ TCPLOG("tryRead: Done or NeedRead " << int(newstate) << ' ' << pos << '/' << pid->inNeeded << endl);
+ pid->inMSG.resize(pos); // old content (if there) + new bytes read
+ pid->inNeeded -= pos;
+ TCPLOG("TCPIOHandlerIO " << pid->inNeeded << ' ' << pid->inIncompleteOkay << endl);
+ if (pid->inNeeded == 0 || pid->inIncompleteOkay) {
+ newstate = IOState::Done;
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(*pid, &pid->inMSG);
+ return;
+ }
+ break;
+ case IOState::NeedWrite:
+ break;
+ }
}
- case IOState::NeedWrite:
- TCPLOG("TCPIOHandlerWritable NeedWrite" << endl);
- // We'll get back later
- break;
- case IOState::NeedRead:
- TCPLOG("NeedRead: flip FD" << endl);
- pid->inNeeded = 1;
- t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, *pid);
- break;
+ catch (const std::exception& e) {
+ newstate = IOState::Done;
+ TCPLOG("read exception..." << e.what() << endl);
+ PacketBuffer empty;
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(*pid, &empty); // this conveys error status
+ return;
}
+ break;
+
+ case TCPAction::DoingWrite:
+ TCPLOG("highState: Writing" << endl);
+ try {
+ TCPLOG("tryWrite: " << pid->outPos << '/' << pid->outMSG.size() << ' ' << pid << " -> ");
+ newstate = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
+ TCPLOG(pid->outPos << '/' << pid->outMSG.size() << endl);
+ switch (newstate) {
+ case IOState::Done: {
+ TCPLOG("tryWrite: Done" << endl);
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(*pid, &pid->outMSG); // send back what we sent to convey everything is ok
+ return;
+ }
+ case IOState::NeedRead:
+ TCPLOG("tryWrite: NeedRead" << endl);
+ break;
+ case IOState::NeedWrite:
+ TCPLOG("tryWrite: NeedWrite" << endl);
+ break;
+ }
+ }
+ catch (const std::exception& e) {
+ newstate = IOState::Done;
+ TCPLOG("write exception..." << e.what() << endl);
+ PacketBuffer sent;
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
+ MT->sendEvent(*pid, &sent); // we convey error status by sending empty string
+ return;
+ }
+ break;
}
- catch (const std::exception& e) {
- TCPLOG("TCPIOHandlerWritable exception..." << e.what() << endl);
- // removeWriteFD seems to clobber PacketID, so take a copy
- PacketID tmp = *pid;
- t_fdm->removeWriteFD(fd);
- PacketBuffer sent;
- MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
- }
+
+ // Cases that did not end up doing a sendEvent
+ TCPIOHandlerStateChange(pid->lowState, newstate, pid);
}
// resend event to everybody chained onto it