#ifdef HAVE_PROTOBUF
#include "uuid-utils.hh"
-#endif
+#endif /* HAVE_PROTOBUF */
#include "xpf.hh"
static thread_local uint64_t t_outgoingProtobufServersGeneration;
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+static thread_local std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> t_frameStreamServers{nullptr};
+static thread_local uint64_t t_frameStreamServersGeneration;
+#endif /* HAVE_FSTRM */
+
thread_local std::unique_ptr<MT_t> MT; // the big MTasker
thread_local std::unique_ptr<MemRecursorCache> t_RC;
thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
{
}
- typedef set<int> socks_t;
- socks_t d_socks;
-
// returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
int getSocket(const ComboAddress& toaddr, int* fd)
{
if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
int err = errno;
- // returnSocket(*fd);
try {
closesocket(*fd);
}
return -1;
}
- d_socks.insert(*fd);
d_numsocks++;
return 0;
}
- void returnSocket(int fd)
- {
- socks_t::iterator i=d_socks.find(fd);
- if(i==d_socks.end()) {
- throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
- }
- returnSocketLocked(i);
- }
-
// return a socket to the pool, or simply erase it
- void returnSocketLocked(socks_t::iterator& i)
+ void returnSocket(int fd)
{
- if(i==d_socks.end()) {
- throw PDNSException("Trying to return a socket not in the pool");
- }
try {
- t_fdm->removeReadFD(*i);
+ t_fdm->removeReadFD(fd);
}
- catch(FDMultiplexerException& e) {
+ catch(const FDMultiplexerException& e) {
// we sometimes return a socket that has not yet been assigned to t_fdm
}
+
try {
- closesocket(*i);
+ closesocket(fd);
}
catch(const PDNSException& e) {
g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
}
- d_socks.erase(i++);
--d_numsocks;
}
+private:
+
// returns -1 for errors which might go away, throws for ones that won't
static int makeClientSocket(int family)
{
if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
break;
}
- if(!tries)
+
+ if(!tries) {
+ closesocket(ret);
throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
+ }
+
+ try {
+ setReceiveSocketErrors(ret, family);
+ setNonBlocking(ret);
+ }
+ catch(...) {
+ closesocket(ret);
+ throw;
+ }
- setReceiveSocketErrors(ret, family);
- setNonBlocking(ret);
return ret;
}
};
int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
+ /* -1 means error, 0 means timeout, 1 means a result from handleUDPServerResponse() which might still be an error */
if(ret > 0) {
+ /* handleUDPServerResponse() will close the socket for us no matter what */
if(packet.empty()) // means "error"
return -1;
}
}
else {
+ /* getting there means error or timeout, it's up to us to close the socket */
if(fd >= 0)
t_udpclientsocks->returnSocket(fd);
}
return true;
}
+
+#ifdef HAVE_FSTRM
+
+static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFrameStreamServers(const FrameStreamExportConfig& config)
+{
+ auto result = std::make_shared<std::vector<std::unique_ptr<FrameStreamLogger>>>();
+
+ for (const auto& server : config.servers) {
+ try {
+ std::unordered_map<string,unsigned> options;
+ options["bufferHint"] = config.bufferHint;
+ options["flushTimeout"] = config.flushTimeout;
+ options["inputQueueSize"] = config.inputQueueSize;
+ options["outputQueueSize"] = config.outputQueueSize;
+ options["queueNotifyThreshold"] = config.queueNotifyThreshold;
+ options["reopenInterval"] = config.reopenInterval;
+ FrameStreamLogger *fsl = nullptr;
+ try {
+ ComboAddress address(server);
+ fsl = new FrameStreamLogger(address.sin4.sin_family, address.toStringWithPort(), true, options);
+ }
+ catch (const PDNSException& e) {
+ fsl = new FrameStreamLogger(AF_UNIX, server, true, options);
+ }
+ fsl->setLogQueries(config.logQueries);
+ fsl->setLogResponses(config.logResponses);
+ result->emplace_back(fsl);
+ }
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
+ }
+ catch(const PDNSException& e) {
+ g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
+ }
+ }
+
+ return result;
+}
+
+static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+ if (!luaconfsLocal->frameStreamExportConfig.enabled) {
+ if (t_frameStreamServers) {
+ // dt's take care of cleanup
+ t_frameStreamServers.reset();
+ }
+
+ return false;
+ }
+
+ /* if the server was not running, or if it was running according to a
+ previous configuration */
+ if (!t_frameStreamServers ||
+ t_frameStreamServersGeneration < luaconfsLocal->generation) {
+
+ if (t_frameStreamServers) {
+ // dt's take care of cleanup
+ t_frameStreamServers.reset();
+ }
+
+ t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
+ t_frameStreamServersGeneration = luaconfsLocal->generation;
+ }
+
+ return true;
+}
+#endif /* HAVE_FSTRM */
#endif /* HAVE_PROTOBUF */
#ifdef NOD_ENABLED
}
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
+
DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
pw.getHeader()->aa=0;
sr.setInitialRequestId(dc->d_uuid);
sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
#endif
-
+#ifdef HAVE_FSTRM
+ sr.setFrameStreamServers(t_frameStreamServers);
+#endif
sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
bool tracedQuery=false; // we could consider letting Lua know about this too
needECS = true;
}
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+#endif /* HAVE_PROTOBUF */
+
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
#endif
if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
}
logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+#endif
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
#endif
EDNSSubnetOpts ednssubnet;
bool ecsFound = false;
retryWithName:
if(!MT->sendEvent(pident, &packet)) {
+ /* we did not find a match for this response, something is wrong */
+
// we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote && mthread->key.type == pident.type &&
}
}
else if(fd >= 0) {
+ /* we either found a waiter (1) or encountered an issue (-1), it's up to us to clean the socket anyway */
t_udpclientsocks->returnSocket(fd);
}
}
checkProtobufExport(luaconfsLocal);
checkOutgoingProtobufExport(luaconfsLocal);
#endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+ checkFrameStreamExport(luaconfsLocal);
+#endif
PacketID pident;