#pragma once
#include <iostream>
#include <sstream>
+#if !defined(RECURSOR)
#include <syslog.h>
+#else
+#include "logger.hh"
+#endif // RECURSOR
+
/* This file is intended not to be metronome specific, and is simple example of C++2011
variadic templates in action.
This will happily print a string to %d! Doesn't do further format processing.
*/
+#if !defined(RECURSOR)
inline void dolog(std::ostream& os, const char*s)
{
os<<s;
else {
os << value;
s += 2;
- dolog(os, s, args...);
+ dolog(os, s, args...);
return;
}
}
os << *s++;
- }
+ }
}
extern bool g_verbose;
genlog(LOG_ERR, s, args...);
}
+
+#else // RECURSOR
+
+#define g_verbose 0
+
+inline void dolog(Logger::Urgency u, const char* s)
+{
+ g_log << u << s << std::endl;
+}
+
+template<typename T, typename... Args>
+void dolog(Logger::Urgency u, const char* s, T value, Args... args)
+{
+ g_log << u;
+ while (*s) {
+ if (*s == '%') {
+ if (*(s + 1) == '%') {
+ ++s;
+ }
+ else {
+ g_log << value;
+ s += 2;
+ dolog(u, s, args...);
+ return;
+ }
+ }
+ g_log << *s++;
+ }
+}
+
+#define vinfolog if(g_verbose)infolog
+
+template<typename... Args>
+void infolog(const char* s, Args... args)
+{
+ dolog(Logger::Info, s, args...);
+}
+
+template<typename... Args>
+void warnlog(const char* s, Args... args)
+{
+ dolog(Logger::Warning, s, args...);
+}
+
+template<typename... Args>
+void errlog(const char* s, Args... args)
+{
+ dolog(Logger::Error, s, args...);
+}
+
+#endif
+
}
else {
try {
- const int timeout = g_networkTimeoutMsec / 1000; // XXX tcpiohandler's unit is seconds
+ const int timeout = (g_networkTimeoutMsec + 999) / 1000; // XXX tcpiohandler's unit is seconds
Socket s(ip.sin4.sin_family, SOCK_STREAM);
s.setNonBlocking();
s.bind(localip);
std::shared_ptr<TLSCtx> tlsCtx{nullptr};
+ if (ip.getPort() == 853) {
+ TLSContextParameters tlsParams;
+ tlsParams.d_provider = "openssl";
+ tlsParams.d_validateCertificates = false;
+ //tlsParams.d_caStore = caaStore;
+ tlsCtx = getTLSContext(tlsParams);
+ }
auto handler = std::make_shared<TCPIOHandler>("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec);
/* auto state = */ handler->tryConnect(SyncRes::s_tcp_fast_open_connect, ip);
+ //cerr << "state after TryConnect() " << int(state) << endl;
uint16_t tlen=htons(vpacket.size());
char *lenP=(char*)&tlen;
const char *msgP=(const char*)&*vpacket.begin();
packet.insert(packet.end(), msgP, msgP+vpacket.size());
ret = asendtcp(packet, handler);
if (ret != LWResult::Result::Success) {
+ handler->close();
return ret;
}
}
#endif /* HAVE_FSTRM */
- packet.clear();
ret = arecvtcp(packet, 2, handler, false);
if (ret != LWResult::Result::Success) {
return ret;
memcpy(&tlen, packet.data(), sizeof(tlen));
len=ntohs(tlen); // switch to the 'len' shared with the rest of the function
+ // XXX receive into buf directly?
+ packet.resize(len);
ret = arecvtcp(packet, len, handler, false);
if (ret != LWResult::Result::Success) {
return ret;
buf.resize(len);
memcpy(buf.data(), packet.data(), len);
+ handler->close();
ret = LWResult::Result::Success;
}
catch (const NetworkError& ne) {
return s_threadInfos.at(t_id).isHandler;
}
+#if 0
+#define TCPLOG(x) cerr << x
+#else
+#define TCPLOG(x)
+#endif
+
static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var);
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>& handler)
{
+ TCPLOG("asendtcp called " << data.size() << endl);
+
PacketID pident;
pident.tcphandler = handler;
pident.tcpsock = handler->getDescriptor();
PacketBuffer packet;
int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
- if (ret == 0) { //timeout
+ TCPLOG("asendtcp waitEvent returned " << ret << ' ' << packet.size() << '/' << data.size() << ' ');
+ if (ret == 0) {
t_fdm->removeWriteFD(handler->getDescriptor());
+ TCPLOG("timeout" << endl);
return LWResult::Result::Timeout;
}
else if (ret == -1) { // error
t_fdm->removeWriteFD(handler->getDescriptor());
+ TCPLOG("PermanentError" << endl);
return LWResult::Result::PermanentError;
}
else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
+ TCPLOG("PermanentError size mismatch" << endl);
return LWResult::Result::PermanentError;
}
+ TCPLOG("asendtcp success" << endl);
return LWResult::Result::Success;
}
LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, shared_ptr<TCPIOHandler>& handler, const bool incompleteOkay)
{
- data.clear();
+ TCPLOG("arecvtcp called " << len << ' ' << data.size() << endl);
+ size_t pos = 0;
+ data.resize(len);
+ TCPLOG("calling tryRead() " << data.size() << ' ' << len << endl);
+ /* IOState state = */ handler->tryRead(data, pos, len);
+ TCPLOG("arcvtcp tryRead() returned " << int(state) << ' ' << pos << '/' << len << endl);
+
+ if (pos == len || (incompleteOkay && pos > 0)) {
+ data.resize(pos);
+ TCPLOG("acecvtcp success A" << endl);
+ return LWResult::Result::Success;
+ }
+ data.clear();
PacketID pident;
pident.tcphandler = handler;
pident.tcpsock = handler->getDescriptor();
t_fdm->addReadFD(handler->getDescriptor(), TCPIOHandlerReadable, pident);
int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
+ TCPLOG("arecvtcp" << ret << ' ' << data.size() << ' ');
if (ret == 0) {
+ TCPLOG("timeout" << endl);
t_fdm->removeReadFD(handler->getDescriptor());
return LWResult::Result::Timeout;
}
else if (ret == -1) {
+ TCPLOG("PermanentError" << endl);
t_fdm->removeWriteFD(handler->getDescriptor());
return LWResult::Result::PermanentError;
}
else if (data.empty()) {// error, EOF or other
+ TCPLOG("EOF" << endl);
return LWResult::Result::PermanentError;
}
+ TCPLOG("arecvtcp success" << endl);
return LWResult::Result::Success;
}
static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var)
{
PacketID* pident=boost::any_cast<PacketID>(&var);
- assert(pident->tcphandler != nullptr);
+ assert(pident->tcphandler);
assert(fd == pident->tcphandler->getDescriptor());
- // XXX Likely it is possible to directly write into inMSG
- // To be able to do that, inMSG has to be resized before. Afer the read we may have to resize again
- // taking into account the actual bytes read. Wondering if this is worth the trouble...
- PacketBuffer buffer;
- buffer.resize(pident->inNeeded);
+ TCPLOG("TCPIOHandlerReadable" << endl);
try {
- size_t pos = 0;
- IOState state = pident->tcphandler->tryRead(buffer, pos, pident->inNeeded);
+ size_t pos = pident->inMSG.size();
+ pident->inMSG.resize(pos + pident->inNeeded); // make room for what we'll read
+ IOState state = pident->tcphandler->tryRead(pident->inMSG, pos, pident->inNeeded);
+
switch (state) {
case IOState::Done:
case IOState::NeedRead:
- pident->inMSG.insert(pident->inMSG.end(), buffer.data(), buffer.data() + pos);
+ TCPLOG("TCPIOHandlerReadable state Done or Read " << int(state) << ' ' << buffer.size() << " bytes " << pident->inNeeded << '/' << pos << endl);
+ pident->inMSG.resize(pos); // old content (if there) + new bytes read
pident->inNeeded -= pos;
+ TCPLOG("TCPIOHandlerReadable " << pident->inNeeded << ' ' << pident->inIncompleteOkay << endl);
if (pident->inNeeded == 0 || pident->inIncompleteOkay) {
// removeReadFD seems to clobber PacketID, so take a copy
PacketID pid = *pident;
t_fdm->removeReadFD(fd);
MT->sendEvent(pid, &pid.inMSG);
+ break;
}
+ TCPLOG("TCPIOHandlerReadable more? flip to write seems needed???..." << endl);
+ t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident);
break;
case IOState::NeedWrite:
- t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, var);
+ TCPLOG("NeedWrite... flip FD" << endl);
+ t_fdm->alterFDToWrite(fd, TCPIOHandlerWritable, *pident);
break;
}
}
- catch (const std::runtime_error& e) {
+ catch (const std::exception& e) {
+ TCPLOG("TCPIOHandlerReadble exception..." << e.what() << endl);
PacketID tmp = *pident;
t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
PacketBuffer empty;
static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var)
{
PacketID* pid = boost::any_cast<PacketID>(&var);
- assert(pid->tcphandler != nullptr);
+ assert(pid->tcphandler);
assert(fd == pid->tcphandler->getDescriptor());
+ TCPLOG("TCPIOHandlerWritable " << pid->outPos << '/' << pid->outMSG.size() << endl);
try {
IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
switch (state) {
case IOState::Done: {
+ TCPLOG("TCPIOHandlerWritable Done" << endl);
// removeWriteFD seems to clobber PacketID, so take a copy
PacketID tmp = *pid;
t_fdm->removeWriteFD(fd);
break;
}
case IOState::NeedWrite:
+ TCPLOG("TCPIOHandlerWritable NeedWrite" << endl);
// We'll get back later
- break;
+ break;
case IOState::NeedRead:
- t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, var);
+ TCPLOG("NeedRead: flip FD" << endl);
+ pid->inNeeded = 1;
+ t_fdm->alterFDToRead(fd, TCPIOHandlerReadable, *pid);
break;
}
}
- catch (const std::runtime_error& e) {
+ catch (const std::exception& e) {
+ TCPLOG("TCPIOHandlerWritable exception..." << e.what() << endl);
// removeWriteFD seems to clobber PacketID, so take a copy
PacketID tmp = *pid;
t_fdm->removeWriteFD(fd);
return ret;
}
-bool g_verbose; // XXX FIX ME XXX, see tcpiohandler.cc