return s_threadInfos.at(t_id).isHandler;
}
-static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
-
-LWResult::Result asendtcp(const PacketBuffer& data, Socket* sock)
-{
- PacketID pident;
- pident.tcpsock=sock->getHandle();
- pident.outMSG = data;
-
- t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
- PacketBuffer packet;
-
- int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
- if (ret == 0) { //timeout
- t_fdm->removeWriteFD(sock->getHandle());
- return LWResult::Result::Timeout;
- }
- else if (ret == -1) { // error
- t_fdm->removeWriteFD(sock->getHandle());
- return LWResult::Result::PermanentError;
- }
- else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
- return LWResult::Result::PermanentError;
- }
-
- return LWResult::Result::Success;
-}
-
static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var);
-LWResult::Result asendtcp(const PacketBuffer& data, TCPIOHandler& handler)
+LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
{
PacketID pident;
- pident.tcphandler = &handler;
- pident.tcpsock = handler.getDescriptor();
+ pident.tcphandler = handler;
+ pident.tcpsock = handler->getDescriptor();
pident.outMSG = data;
- t_fdm->addWriteFD(handler.getDescriptor(), TCPIOHandlerWritable, pident);
+ t_fdm->addWriteFD(handler->getDescriptor(), TCPIOHandlerWritable, pident);
PacketBuffer packet;
int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
if (ret == 0) { //timeout
- t_fdm->removeWriteFD(handler.getDescriptor());
+ t_fdm->removeWriteFD(handler->getDescriptor());
return LWResult::Result::Timeout;
}
else if (ret == -1) { // error
- t_fdm->removeWriteFD(handler.getDescriptor());
+ t_fdm->removeWriteFD(handler->getDescriptor());
return LWResult::Result::PermanentError;
}
else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
return LWResult::Result::Success;
}
-static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
-
-LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, Socket* sock, const bool incompleteOkay)
-{
- data.clear();
- PacketID pident;
- pident.tcpsock=sock->getHandle();
- pident.inNeeded=len;
- pident.inIncompleteOkay=incompleteOkay;
- t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
-
- int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
- if (ret == 0) {
- t_fdm->removeReadFD(sock->getHandle());
- return LWResult::Result::Timeout;
- }
- else if (ret == -1) {
- t_fdm->removeWriteFD(sock->getHandle());
- return LWResult::Result::PermanentError;
- }
- else if (data.empty()) {// error, EOF or other
- return LWResult::Result::PermanentError;
- }
-
- return LWResult::Result::Success;
-}
-
static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var);
-LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, TCPIOHandler& handler, const bool incompleteOkay)
+LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
{
data.clear();
PacketID pident;
- pident.tcphandler = &handler;
- pident.tcpsock = handler.getDescriptor();
+ pident.tcphandler = handler;
+ pident.tcpsock = handler->getDescriptor();
pident.inNeeded = len;
pident.inIncompleteOkay = incompleteOkay;
- t_fdm->addReadFD(handler.getDescriptor(), TCPIOHandlerReadable, pident);
+ t_fdm->addReadFD(handler->getDescriptor(), TCPIOHandlerReadable, pident);
int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
if (ret == 0) {
- t_fdm->removeReadFD(handler.getDescriptor());
+ t_fdm->removeReadFD(handler->getDescriptor());
return LWResult::Result::Timeout;
}
else if (ret == -1) {
- t_fdm->removeWriteFD(handler.getDescriptor());
+ t_fdm->removeWriteFD(handler->getDescriptor());
return LWResult::Result::PermanentError;
}
else if (data.empty()) {// error, EOF or other
}
}
-static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var)
-{
- PacketID* pident=boost::any_cast<PacketID>(&var);
- // cerr<<"handleTCPClientReadable called for fd "<<fd<<", pident->inNeeded: "<<pident->inNeeded<<", "<<pident->sock->getHandle()<<endl;
-
- boost::shared_array<char> buffer(new char[pident->inNeeded]);
-
- ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
- if(ret > 0) {
- pident->inMSG.insert(pident->inMSG.end(), &buffer[0], &buffer[ret]);
- pident->inNeeded -= (size_t)ret;
- if(!pident->inNeeded || pident->inIncompleteOkay) {
- // cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
- PacketID pid=*pident;
- PacketBuffer msg = pident->inMSG;
-
- t_fdm->removeReadFD(fd);
- MT->sendEvent(pid, &msg);
- }
- else {
- // cerr<<"Still have "<<pident->inNeeded<<" left to go"<<endl;
- }
- }
- else {
- PacketID tmp=*pident;
- t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
- PacketBuffer empty;
- MT->sendEvent(tmp, &empty); // this conveys error status
- }
-}
-
static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var)
{
PacketID* pident=boost::any_cast<PacketID>(&var);
pident->inMSG.insert(pident->inMSG.end(), buffer.data(), buffer.data() + pos);
pident->inNeeded -= pos;
if (pident->inNeeded == 0 || pident->inIncompleteOkay) {
+ // removeReadFD seems to clobber PacketID, so take a copy
PacketID pid = *pident;
- PacketBuffer msg = pident->inMSG;
t_fdm->removeReadFD(fd);
- MT->sendEvent(pid, &msg);
+ MT->sendEvent(pid, &pid.inMSG);
}
break;
case IOState::NeedWrite:
- // What to do?
+ t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, var);
break;
}
}
}
}
-static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
-{
- PacketID* pid = boost::any_cast<PacketID>(&var);
- ssize_t ret = send(fd, pid->outMSG.data() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
- if (ret > 0) {
- pid->outPos += (ssize_t)ret;
- if (pid->outPos == pid->outMSG.size()) {
- PacketID tmp=*pid;
- t_fdm->removeWriteFD(fd);
- MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
- }
- }
- else { // error or EOF
- PacketID tmp(*pid);
- t_fdm->removeWriteFD(fd);
- PacketBuffer sent;
- MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
- }
-}
-
static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var)
{
PacketID* pid = boost::any_cast<PacketID>(&var);
IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
switch (state) {
case IOState::Done: {
+ // removeWriteFD seems to clobber PacketID, so take a copy
PacketID tmp = *pid;
t_fdm->removeWriteFD(fd);
MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
// We'll get back later
break;
case IOState::NeedRead:
- // What to do?
+ t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, var);
break;
}
}
catch (const std::runtime_error& e) {
+ // removeWriteFD seems to clobber PacketID, so take a copy
PacketID tmp = *pid;
t_fdm->removeWriteFD(fd);
PacketBuffer sent;
return ret;
}
+
+bool g_verbose; // XXX FIX ME XXX, see tcpiohandler.cc