]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/pdns_recursor.cc
Merge remote-tracking branch 'origin/master' into trustanchor.server
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
index 66310e50890ac878c62c4d4441e50fabaefcae79..061357220b9bd01bfcf7ed163c5bfd52c2858c0b 100644 (file)
 
 #ifdef HAVE_PROTOBUF
 #include "uuid-utils.hh"
-#endif
+#endif /* HAVE_PROTOBUF */
 
 #include "xpf.hh"
 
@@ -121,6 +121,11 @@ static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>
 static thread_local uint64_t t_outgoingProtobufServersGeneration;
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+static thread_local std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> t_frameStreamServers{nullptr};
+static thread_local uint64_t t_frameStreamServersGeneration;
+#endif /* HAVE_FSTRM */
+
 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
 thread_local std::unique_ptr<MemRecursorCache> t_RC;
 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
@@ -535,9 +540,6 @@ public:
   {
   }
 
-  typedef set<int> socks_t;
-  socks_t d_socks;
-
   // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
   int getSocket(const ComboAddress& toaddr, int* fd)
   {
@@ -547,7 +549,6 @@ public:
 
     if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
       int err = errno;
-      //      returnSocket(*fd);
       try {
         closesocket(*fd);
       }
@@ -560,43 +561,32 @@ public:
       return -1;
     }
 
-    d_socks.insert(*fd);
     d_numsocks++;
     return 0;
   }
 
-  void returnSocket(int fd)
-  {
-    socks_t::iterator i=d_socks.find(fd);
-    if(i==d_socks.end()) {
-      throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
-    }
-    returnSocketLocked(i);
-  }
-
   // return a socket to the pool, or simply erase it
-  void returnSocketLocked(socks_t::iterator& i)
+  void returnSocket(int fd)
   {
-    if(i==d_socks.end()) {
-      throw PDNSException("Trying to return a socket not in the pool");
-    }
     try {
-      t_fdm->removeReadFD(*i);
+      t_fdm->removeReadFD(fd);
     }
-    catch(FDMultiplexerException& e) {
+    catch(const FDMultiplexerException& e) {
       // we sometimes return a socket that has not yet been assigned to t_fdm
     }
+
     try {
-      closesocket(*i);
+      closesocket(fd);
     }
     catch(const PDNSException& e) {
       g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
     }
 
-    d_socks.erase(i++);
     --d_numsocks;
   }
 
+private:
+
   // returns -1 for errors which might go away, throws for ones that won't
   static int makeClientSocket(int family)
   {
@@ -629,11 +619,21 @@ public:
       if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
         break;
     }
-    if(!tries)
+
+    if(!tries) {
+      closesocket(ret);
       throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
+    }
+
+    try {
+      setReceiveSocketErrors(ret, family);
+      setNonBlocking(ret);
+    }
+    catch(...) {
+      closesocket(ret);
+      throw;
+    }
 
-    setReceiveSocketErrors(ret, family);
-    setNonBlocking(ret);
     return ret;
   }
 };
@@ -703,7 +703,9 @@ int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size
 
   int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
 
+  /* -1 means error, 0 means timeout, 1 means a result from handleUDPServerResponse() which might still be an error */
   if(ret > 0) {
+    /* handleUDPServerResponse() will close the socket for us no matter what */
     if(packet.empty()) // means "error"
       return -1;
 
@@ -716,6 +718,7 @@ int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size
     }
   }
   else {
+    /* getting there means error or timeout, it's up to us to close the socket */
     if(fd >= 0)
       t_udpclientsocks->returnSocket(fd);
   }
@@ -964,6 +967,73 @@ static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luacon
 
   return true;
 }
+
+#ifdef HAVE_FSTRM
+
+static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFrameStreamServers(const FrameStreamExportConfig& config)
+{
+  auto result = std::make_shared<std::vector<std::unique_ptr<FrameStreamLogger>>>();
+
+  for (const auto& server : config.servers) {
+    try {
+      std::unordered_map<string,unsigned> options;
+      options["bufferHint"] = config.bufferHint;
+      options["flushTimeout"] = config.flushTimeout;
+      options["inputQueueSize"] = config.inputQueueSize;
+      options["outputQueueSize"] = config.outputQueueSize;
+      options["queueNotifyThreshold"] = config.queueNotifyThreshold;
+      options["reopenInterval"] = config.reopenInterval;
+      FrameStreamLogger *fsl = nullptr;
+      try {
+        ComboAddress address(server);
+        fsl = new FrameStreamLogger(address.sin4.sin_family, address.toStringWithPort(), true, options);
+      }
+      catch (const PDNSException& e) {
+        fsl = new FrameStreamLogger(AF_UNIX, server, true, options);
+      }
+      fsl->setLogQueries(config.logQueries);
+      fsl->setLogResponses(config.logResponses);
+      result->emplace_back(fsl);
+    }
+    catch(const std::exception& e) {
+      g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
+    }
+    catch(const PDNSException& e) {
+      g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
+    }
+  }
+
+  return result;
+}
+
+static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+  if (!luaconfsLocal->frameStreamExportConfig.enabled) {
+    if (t_frameStreamServers) {
+      // dt's take care of cleanup
+      t_frameStreamServers.reset();
+    }
+
+    return false;
+  }
+
+  /* if the server was not running, or if it was running according to a
+     previous configuration */
+  if (!t_frameStreamServers ||
+      t_frameStreamServersGeneration < luaconfsLocal->generation) {
+
+    if (t_frameStreamServers) {
+      // dt's take care of cleanup
+      t_frameStreamServers.reset();
+    }
+
+    t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
+    t_frameStreamServersGeneration = luaconfsLocal->generation;
+  }
+
+  return true;
+}
+#endif /* HAVE_FSTRM */
 #endif /* HAVE_PROTOBUF */
 
 #ifdef NOD_ENABLED
@@ -1092,6 +1162,10 @@ static void startDoResolve(void *p)
     }
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+    checkFrameStreamExport(luaconfsLocal);
+#endif
+
     DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
 
     pw.getHeader()->aa=0;
@@ -1147,7 +1221,9 @@ static void startDoResolve(void *p)
     sr.setInitialRequestId(dc->d_uuid);
     sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
 #endif
-
+#ifdef HAVE_FSTRM
+    sr.setFrameStreamServers(t_frameStreamServers);
+#endif
     sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
 
     bool tracedQuery=false; // we could consider letting Lua know about this too
@@ -1918,6 +1994,10 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         needECS = true;
       }
       logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+#endif /* HAVE_PROTOBUF */
+
+#ifdef HAVE_FSTRM
+      checkFrameStreamExport(luaconfsLocal);
 #endif
 
       if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
@@ -2112,6 +2192,9 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   }
   logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
   bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+#endif
+#ifdef HAVE_FSTRM
+  checkFrameStreamExport(luaconfsLocal);
 #endif
   EDNSSubnetOpts ednssubnet;
   bool ecsFound = false;
@@ -3242,6 +3325,8 @@ static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
 retryWithName:
 
   if(!MT->sendEvent(pident, &packet)) {
+    /* we did not find a match for this response, something is wrong */
+
     // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
     for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
       if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
@@ -3264,6 +3349,7 @@ retryWithName:
     }
   }
   else if(fd >= 0) {
+    /* we either found a waiter (1) or encountered an issue (-1), it's up to us to clean the socket anyway */
     t_udpclientsocks->returnSocket(fd);
   }
 }
@@ -4154,6 +4240,9 @@ try
   checkProtobufExport(luaconfsLocal);
   checkOutgoingProtobufExport(luaconfsLocal);
 #endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+  checkFrameStreamExport(luaconfsLocal);
+#endif
 
   PacketID pident;