ret=getFakePTRRecords(dq.followupName, dq.records);
}
else if(dq.followupFunction=="udpQueryResponse") {
- dq.udpAnswer = GenUDPQueryResponse(dq.udpQueryDest, dq.udpQuery);
+ PacketBuffer p = GenUDPQueryResponse(dq.udpQueryDest, dq.udpQuery);
+ dq.udpAnswer = std::string(reinterpret_cast<const char*>(p.data()), p.size());
auto cbFunc = d_lw->readVariable<boost::optional<luacall_t>>(dq.udpCallback).get_value_or(0);
if(!cbFunc) {
g_log<<Logger::Error<<"Attempted callback for Lua UDP Query/Response which could not be found"<<endl;
#include "validate.hh"
#include "lua-base4.hh"
#include "proxy-protocol.hh"
+#include "noinitvector.hh"
#include <unordered_map>
#include "lua-recursor4-ffi.hh"
-string GenUDPQueryResponse(const ComboAddress& dest, const string& query);
+PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query);
unsigned int getRecursorThreadId();
// pdns_ffi_param_t is a lightuserdata
#include "validate-recursor.hh"
#include "ednssubnet.hh"
#include "query-local-address.hh"
+#include "tcpiohandler.hh"
#include "rec-protozero.hh"
#include "uuid-utils.hh"
return false;
}
-static void logFstreamResponse(const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstreamLoggers, const ComboAddress&localip, const ComboAddress& ip, bool doTCP, boost::optional<const DNSName&> auth, const std::string& packet, const struct timeval& queryTime, const struct timeval& replyTime)
+static void logFstreamResponse(const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstreamLoggers, const ComboAddress&localip, const ComboAddress& ip, bool doTCP, boost::optional<const DNSName&> auth, const PacketBuffer& packet, const struct timeval& queryTime, const struct timeval& replyTime)
{
if (fstreamLoggers == nullptr)
return;
TIMEVAL_TO_TIMESPEC(&queryTime, &ts1);
TIMEVAL_TO_TIMESPEC(&replyTime, &ts2);
std::string str;
- DnstapMessage message(str, DnstapMessage::MessageType::resolver_response, SyncRes::s_serverID, &localip, &ip, doTCP, static_cast<const char*>(&*packet.begin()), packet.size(), &ts1, &ts2, auth);
+ DnstapMessage message(str, DnstapMessage::MessageType::resolver_response, SyncRes::s_serverID, &localip, &ip, doTCP, reinterpret_cast<const char*>(packet.data()), packet.size(), &ts1, &ts2, auth);
for (auto& logger : *fstreamLoggers) {
logger->queueData(str);
{
size_t len;
size_t bufsize=g_outgoingEDNSBufsize;
- std::string buf;
+ PacketBuffer buf;
buf.resize(bufsize);
vector<uint8_t> vpacket;
// string mapped0x20=dns0x20(domain);
}
else {
try {
- Socket s(ip.sin4.sin_family, SOCK_STREAM);
+ const int timeout = g_networkTimeoutMsec / 1000; // XXX tcpiohandler's unit is seconds
+ Socket s(ip.sin4.sin_family, SOCK_STREAM);
s.setNonBlocking();
- if (SyncRes::s_tcp_fast_open_connect) {
- try {
- s.setFastOpenConnect();
- }
- catch (const NetworkError& e) {
- // Ignore error, we did a pre-check in pdns_recursor.cc:checkTFOconnect()
- }
- }
-
localip = pdns::getQueryLocalAddress(ip.sin4.sin_family, 0);
s.bind(localip);
- s.connect(ip);
+ std::shared_ptr<TLSCtx> tlsCtx{nullptr};
+ TCPIOHandler handler("", s.releaseHandle(), timeout, tlsCtx, now->tv_sec);
+ IOState state = handler.tryConnect(SyncRes::s_tcp_fast_open_connect, ip);
uint16_t tlen=htons(vpacket.size());
char *lenP=(char*)&tlen;
const char *msgP=(const char*)&*vpacket.begin();
- string packet=string(lenP, lenP+2)+string(msgP, msgP+vpacket.size());
- ret = asendtcp(packet, &s);
+ PacketBuffer packet;
+ packet.reserve(2 + vpacket.size());
+ packet.insert(packet.end(), lenP, lenP+2);
+ packet.insert(packet.end(), msgP, msgP+vpacket.size());
+ ret = asendtcp(packet, handler);
if (ret != LWResult::Result::Success) {
return ret;
}
#endif /* HAVE_FSTRM */
packet.clear();
- ret = arecvtcp(packet, 2, &s, false);
+ ret = arecvtcp(packet, 2, handler, false);
if (ret != LWResult::Result::Success) {
return ret;
}
- memcpy(&tlen, packet.c_str(), sizeof(tlen));
+ memcpy(&tlen, packet.data(), sizeof(tlen));
len=ntohs(tlen); // switch to the 'len' shared with the rest of the function
- ret = arecvtcp(packet, len, &s, false);
+ ret = arecvtcp(packet, len, handler, false);
if (ret != LWResult::Result::Success) {
return ret;
}
buf.resize(len);
- memcpy(const_cast<char*>(buf.data()), packet.c_str(), len);
+ memcpy(buf.data(), packet.data(), len);
ret = LWResult::Result::Success;
}
lwr->d_records.clear();
try {
lwr->d_tcbit=0;
- MOADNSParser mdp(false, buf);
+ MOADNSParser mdp(false, reinterpret_cast<const char*>(buf.data()), buf.size());
lwr->d_aabit=mdp.d_header.aa;
lwr->d_tcbit=mdp.d_header.tc;
lwr->d_rcode=mdp.d_header.rcode;
#include "remote_logger.hh"
#include "fstrm_logger.hh"
#include "resolve-context.hh"
+#include "noinitvector.hh"
class LWResException : public PDNSException
{
LWResult::Result asendto(const char *data, size_t len, int flags, const ComboAddress& ip, uint16_t id,
const DNSName& domain, uint16_t qtype, int* fd);
-LWResult::Result arecvfrom(std::string& packet, int flags, const ComboAddress& ip, size_t *d_len, uint16_t id,
+LWResult::Result arecvfrom(PacketBuffer& packet, int flags, const ComboAddress& ip, size_t *d_len, uint16_t id,
const DNSName& domain, uint16_t qtype, int fd, const struct timeval* now);
LWResult::Result asyncresolve(const ComboAddress& ip, const DNSName& domain, int type, bool doTCP, bool sendRDQuery, int EDNS0Level, struct timeval* now, boost::optional<Netmask>& srcmask, boost::optional<const ResolveContext&> context, const std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>& outgoingLoggers, const std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>>& fstrmLoggers, const std::set<uint16_t>& exportTypes, LWResult* res, bool* chained);
static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var);
-LWResult::Result asendtcp(const string& data, Socket* sock)
+LWResult::Result asendtcp(const PacketBuffer& data, Socket* sock)
{
PacketID pident;
pident.tcpsock=sock->getHandle();
- pident.outMSG=data;
+ pident.outMSG = data;
t_fdm->addWriteFD(sock->getHandle(), handleTCPClientWritable, pident);
- string packet;
+ PacketBuffer packet;
int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
if (ret == 0) { //timeout
return LWResult::Result::Success;
}
+static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var);
+
+LWResult::Result asendtcp(const PacketBuffer& data, TCPIOHandler& handler)
+{
+ PacketID pident;
+ pident.tcphandler = &handler;
+ pident.tcpsock = handler.getDescriptor();
+ pident.outMSG = data;
+
+ t_fdm->addWriteFD(handler.getDescriptor(), TCPIOHandlerWritable, pident);
+ PacketBuffer packet;
+
+ int ret = MT->waitEvent(pident, &packet, g_networkTimeoutMsec);
+ if (ret == 0) { //timeout
+ t_fdm->removeWriteFD(handler.getDescriptor());
+ return LWResult::Result::Timeout;
+ }
+ else if (ret == -1) { // error
+ t_fdm->removeWriteFD(handler.getDescriptor());
+ return LWResult::Result::PermanentError;
+ }
+ else if (packet.size() != data.size()) { // main loop tells us what it sent out, or empty in case of an error
+ return LWResult::Result::PermanentError;
+ }
+
+ return LWResult::Result::Success;
+}
+
static void handleTCPClientReadable(int fd, FDMultiplexer::funcparam_t& var);
-LWResult::Result arecvtcp(string& data, const size_t len, Socket* sock, const bool incompleteOkay)
+LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, Socket* sock, const bool incompleteOkay)
{
data.clear();
PacketID pident;
pident.inIncompleteOkay=incompleteOkay;
t_fdm->addReadFD(sock->getHandle(), handleTCPClientReadable, pident);
- int ret = MT->waitEvent(pident,&data, g_networkTimeoutMsec);
+ int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
if (ret == 0) {
t_fdm->removeReadFD(sock->getHandle());
return LWResult::Result::Timeout;
return LWResult::Result::Success;
}
+static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var);
+
+LWResult::Result arecvtcp(PacketBuffer& data, const size_t len, TCPIOHandler& handler, const bool incompleteOkay)
+{
+ data.clear();
+
+ PacketID pident;
+ pident.tcphandler = &handler;
+ pident.tcpsock = handler.getDescriptor();
+ pident.inNeeded = len;
+ pident.inIncompleteOkay = incompleteOkay;
+ t_fdm->addReadFD(handler.getDescriptor(), TCPIOHandlerReadable, pident);
+
+ int ret = MT->waitEvent(pident, &data, g_networkTimeoutMsec);
+ if (ret == 0) {
+ t_fdm->removeReadFD(handler.getDescriptor());
+ return LWResult::Result::Timeout;
+ }
+ else if (ret == -1) {
+ t_fdm->removeWriteFD(handler.getDescriptor());
+ return LWResult::Result::PermanentError;
+ }
+ else if (data.empty()) {// error, EOF or other
+ return LWResult::Result::PermanentError;
+ }
+
+ return LWResult::Result::Success;
+}
+
static void handleGenUDPQueryResponse(int fd, FDMultiplexer::funcparam_t& var)
{
- PacketID pident=*boost::any_cast<PacketID>(&var);
- char resp[512];
+ PacketID pident = *boost::any_cast<PacketID>(&var);
+ PacketBuffer resp;
+ resp.resize(512);
ComboAddress fromaddr;
- socklen_t addrlen=sizeof(fromaddr);
+ socklen_t addrlen = sizeof(fromaddr);
- ssize_t ret=recvfrom(fd, resp, sizeof(resp), 0, (sockaddr *)&fromaddr, &addrlen);
+ ssize_t ret = recvfrom(fd, resp.data(), resp.size(), 0, (sockaddr *)&fromaddr, &addrlen);
if (fromaddr != pident.remote) {
g_log<<Logger::Notice<<"Response received from the wrong remote host ("<<fromaddr.toStringWithPort()<<" instead of "<<pident.remote.toStringWithPort()<<"), discarding"<<endl;
t_fdm->removeReadFD(fd);
if(ret >= 0) {
- string data(resp, (size_t) ret);
- MT->sendEvent(pident, &data);
+ MT->sendEvent(pident, &resp);
}
else {
- string empty;
+ PacketBuffer empty;
MT->sendEvent(pident, &empty);
// cerr<<"Had some kind of error: "<<ret<<", "<<stringerror()<<endl;
}
}
-string GenUDPQueryResponse(const ComboAddress& dest, const string& query)
+PacketBuffer GenUDPQueryResponse(const ComboAddress& dest, const string& query)
{
Socket s(dest.sin4.sin_family, SOCK_DGRAM);
s.setNonBlocking();
pident.type=0;
t_fdm->addReadFD(s.getHandle(), handleGenUDPQueryResponse, pident);
- string data;
+ PacketBuffer data;
- int ret=MT->waitEvent(pident,&data, g_networkTimeoutMsec);
+ int ret=MT->waitEvent(pident, &data, g_networkTimeoutMsec);
if(!ret || ret==-1) { // timeout
t_fdm->removeReadFD(s.getHandle());
return LWResult::Result::Success;
}
-LWResult::Result arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size_t *d_len,
+LWResult::Result arecvfrom(PacketBuffer& packet, int flags, const ComboAddress& fromaddr, size_t *d_len,
uint16_t id, const DNSName& domain, uint16_t qtype, int fd, const struct timeval* now)
{
static const unsigned int nearMissLimit = ::arg().asNum("spoof-nearmiss-max");
ssize_t ret=recv(fd, buffer.get(), pident->inNeeded,0);
if(ret > 0) {
- pident->inMSG.append(&buffer[0], &buffer[ret]);
- pident->inNeeded-=(size_t)ret;
+ pident->inMSG.insert(pident->inMSG.end(), &buffer[0], &buffer[ret]);
+ pident->inNeeded -= (size_t)ret;
if(!pident->inNeeded || pident->inIncompleteOkay) {
// cerr<<"Got entire load of "<<pident->inMSG.size()<<" bytes"<<endl;
PacketID pid=*pident;
- string msg=pident->inMSG;
+ PacketBuffer msg = pident->inMSG;
t_fdm->removeReadFD(fd);
MT->sendEvent(pid, &msg);
else {
PacketID tmp=*pident;
t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
- string empty;
+ PacketBuffer empty;
+ MT->sendEvent(tmp, &empty); // this conveys error status
+ }
+}
+
+static void TCPIOHandlerReadable(int fd, FDMultiplexer::funcparam_t& var)
+{
+ PacketID* pident=boost::any_cast<PacketID>(&var);
+ assert(pident->tcphandler != nullptr);
+ assert(fd == pident->tcphandler->getDescriptor());
+ // XXX Likely it is possible to directly write into inMSG
+ // To be able to do that, inMSG has to be resized before. Afer the read we may have to resize again
+ // taking into account the actual bytes read. Wondering if this is worth the trouble...
+ PacketBuffer buffer;
+ buffer.resize(pident->inNeeded);
+
+ try {
+ size_t pos = 0;
+ IOState state = pident->tcphandler->tryRead(buffer, pos, pident->inNeeded);
+ switch (state) {
+ case IOState::Done:
+ case IOState::NeedRead:
+ pident->inMSG.insert(pident->inMSG.end(), buffer.data(), buffer.data() + pos);
+ pident->inNeeded -= pos;
+ if (pident->inNeeded == 0 || pident->inIncompleteOkay) {
+ PacketID pid = *pident;
+ PacketBuffer msg = pident->inMSG;
+ t_fdm->removeReadFD(fd);
+ MT->sendEvent(pid, &msg);
+ }
+ break;
+ case IOState::NeedWrite:
+ // What to do?
+ break;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ PacketID tmp = *pident;
+ t_fdm->removeReadFD(fd); // pident might now be invalid (it isn't, but still)
+ PacketBuffer empty;
MT->sendEvent(tmp, &empty); // this conveys error status
}
}
static void handleTCPClientWritable(int fd, FDMultiplexer::funcparam_t& var)
{
PacketID* pid = boost::any_cast<PacketID>(&var);
- ssize_t ret = send(fd, pid->outMSG.c_str() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
+ ssize_t ret = send(fd, pid->outMSG.data() + pid->outPos, pid->outMSG.size() - pid->outPos,0);
if (ret > 0) {
pid->outPos += (ssize_t)ret;
if (pid->outPos == pid->outMSG.size()) {
else { // error or EOF
PacketID tmp(*pid);
t_fdm->removeWriteFD(fd);
- string sent;
+ PacketBuffer sent;
+ MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
+ }
+}
+
+static void TCPIOHandlerWritable(int fd, FDMultiplexer::funcparam_t& var)
+{
+ PacketID* pid = boost::any_cast<PacketID>(&var);
+ assert(pid->tcphandler != nullptr);
+ assert(fd == pid->tcphandler->getDescriptor());
+
+ try {
+ IOState state = pid->tcphandler->tryWrite(pid->outMSG, pid->outPos, pid->outMSG.size());
+ switch (state) {
+ case IOState::Done: {
+ PacketID tmp = *pid;
+ t_fdm->removeWriteFD(fd);
+ MT->sendEvent(tmp, &tmp.outMSG); // send back what we sent to convey everything is ok
+ break;
+ }
+ case IOState::NeedWrite:
+ // We'll get back later
+ break;
+ case IOState::NeedRead:
+ // What to do?
+ break;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ PacketID tmp = *pid;
+ t_fdm->removeWriteFD(fd);
+ PacketBuffer sent;
MT->sendEvent(tmp, &sent); // we convey error status by sending empty string
}
}
// resend event to everybody chained onto it
-static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const string& content)
+static void doResends(MT_t::waiters_t::iterator& iter, PacketID resend, const PacketBuffer& content)
{
// We close the chain for new entries, since they won't be processed anyway
iter->key.closed = true;
{
PacketID pid=boost::any_cast<PacketID>(var);
ssize_t len;
- std::string packet;
+ PacketBuffer packet;
packet.resize(g_outgoingEDNSBufsize);
ComboAddress fromaddr;
socklen_t addrlen=sizeof(fromaddr);
}
t_udpclientsocks->returnSocket(fd);
- string empty;
+ PacketBuffer empty;
MT_t::waiters_t::iterator iter=MT->d_waiters.find(pid);
if(iter != MT->d_waiters.end())
else {
try {
if(len > 12)
- pident.domain=DNSName(&packet.at(0), len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
+ pident.domain=DNSName(reinterpret_cast<const char *>(packet.data()), len, 12, false, &pident.type); // don't copy this from above - we need to do the actual read
}
catch(std::exception& e) {
g_stats.serverParseError++; // won't be fed to lwres.cc, so we have to increment
t_bogusqueryring = std::unique_ptr<boost::circular_buffer<pair<DNSName, uint16_t> > >(new boost::circular_buffer<pair<DNSName, uint16_t> >());
t_bogusqueryring->set_capacity(ringsize);
}
- MT=std::unique_ptr<MTasker<PacketID,string> >(new MTasker<PacketID,string>(::arg().asNum("stack-size")));
+ MT=std::unique_ptr<MTasker<PacketID,PacketBuffer> >(new MTasker<PacketID,PacketBuffer>(::arg().asNum("stack-size")));
threadInfo.mt = MT.get();
/* start protobuf export threads if needed */
}
registerAllStats();
- string msg;
+ PacketBuffer msg;
for(const auto& carbonServer: carbonServers) {
ComboAddress remote(carbonServer, 2003);
Socket s(remote.sin4.sin_family, SOCK_STREAM);
for(const auto& val : all) {
str<<namespace_name<<'.'<<hostname<<'.'<<instance_name<<'.'<<val.first<<' '<<val.second.d_value<<' '<<now<<"\r\n";
}
- msg = str.str();
+ const string& x = str.str();
+ msg.insert(msg.end(), x.cbegin(), x.cend());
}
auto ret = asendtcp(msg, &s); // this will actually do the right thing waiting on the connect
ixfr.cc ixfr.hh \
json.cc json.hh \
lazy_allocator.hh \
+ libssl.cc libssl.hh \
lock.hh \
logger.hh logger.cc \
lua-base4.cc lua-base4.hh \
svc-records.cc svc-records.hh \
syncres.cc syncres.hh \
taskqueue.cc taskqueue.hh \
+ tcpiohandler.hh \
threadname.hh threadname.cc \
tsigverifier.cc tsigverifier.hh \
ueberbackend.hh \
--- /dev/null
+../libssl.cc
\ No newline at end of file
--- /dev/null
+../libssl.hh
\ No newline at end of file
--- /dev/null
+../tcpiohandler.hh
\ No newline at end of file
bool truncated = false;
bool spoofed = false;
- bool gotAnswer = doResolveAtThisIP(prefix, qname, qtype, lwr, ednsmask, auth, sendRDQuery, wasForwarded,
- tns->first, *remoteIP, false, truncated, spoofed);
- if (spoofed || (gotAnswer && truncated) ) {
+ bool gotAnswer = false;
+
+// Option below is for debugging purposes ony
+#define USE_TCP_ONLY 0
+
+#if !USE_TCP_ONLY
+ gotAnswer = doResolveAtThisIP(prefix, qname, qtype, lwr, ednsmask, auth, sendRDQuery, wasForwarded,
+ tns->first, *remoteIP, false, truncated, spoofed);
+ if (spoofed || (gotAnswer && truncated)) {
+#else
+ {
+#endif
/* retry, over TCP this time */
gotAnswer = doResolveAtThisIP(prefix, qname, qtype, lwr, ednsmask, auth, sendRDQuery, wasForwarded,
tns->first, *remoteIP, true, truncated, spoofed);
#include "proxy-protocol.hh"
#include "sholder.hh"
#include "histogram.hh"
+#include "tcpiohandler.hh"
#ifdef HAVE_CONFIG_H
#include "config.h"
class Socket;
/* external functions, opaque to us */
-LWResult::Result asendtcp(const string& data, Socket* sock);
-LWResult::Result arecvtcp(string& data, size_t len, Socket* sock, bool incompleteOkay);
+LWResult::Result asendtcp(const PacketBuffer& data, Socket* sock);
+LWResult::Result arecvtcp(PacketBuffer& data, size_t len, Socket* sock, bool incompleteOkay);
+LWResult::Result asendtcp(const PacketBuffer& data, TCPIOHandler&);
+LWResult::Result arecvtcp(PacketBuffer& data, size_t len, TCPIOHandler&, bool incompleteOkay);
struct PacketID
{
ComboAddress remote; // this is the remote
DNSName domain; // this is the question
- string inMSG; // they'll go here
- string outMSG; // the outgoing message that needs to be sent
+ PacketBuffer inMSG; // they'll go here
+ PacketBuffer outMSG; // the outgoing message that needs to be sent
typedef set<uint16_t > chain_t;
mutable chain_t chain;
+ TCPIOHandler *tcphandler{nullptr};
size_t inNeeded{0}; // if this is set, we'll read until inNeeded bytes are read
string::size_type outPos{0}; // how far we are along in the outMSG
mutable uint32_t nearMisses{0}; // number of near misses - host correct, id wrong
};
extern std::unique_ptr<MemRecursorCache> g_recCache;
extern thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
-typedef MTasker<PacketID,string> MT_t;
+typedef MTasker<PacketID,PacketBuffer> MT_t;
MT_t* getMT();
struct RecursorStats
HttpRequest req(logprefix);
HttpResponse resp;
ComboAddress remote;
- string reply;
+ PacketBuffer reply;
try {
YaHTTP::AsyncRequestLoader yarl;
yarl.initialize(&req);
client->setNonBlocking();
- string data;
+ PacketBuffer data;
try {
while(!req.complete) {
auto ret = arecvtcp(data, 16384, client.get(), true);
if (ret == LWResult::Result::Success) {
- req.complete = yarl.feed(data);
+ string str(reinterpret_cast<const char*>(data.data()), data.size());
+ req.complete = yarl.feed(str);
} else {
// read error OR EOF
break;
WebServer::handleRequest(req, resp);
ostringstream ss;
resp.write(ss);
- reply = ss.str();
+ const string &s = ss.str();
+ reply.insert(reply.end(), s.cbegin(), s.cend());
logResponse(resp, remote, logprefix);