]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/pdns_recursor.cc
Merge pull request #7879 from pieterlexis/rec-start-as-nonroot
[thirdparty/pdns.git] / pdns / pdns_recursor.cc
index d0b8b121c28f3c73a155e3cb430d7e43589893d0..547ca340d64baf1a9f0152d2704d419164bd7dad 100644 (file)
 
 #ifdef HAVE_PROTOBUF
 #include "uuid-utils.hh"
-#endif
+#endif /* HAVE_PROTOBUF */
 
 #include "xpf.hh"
 
@@ -121,6 +121,11 @@ static thread_local std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>>
 static thread_local uint64_t t_outgoingProtobufServersGeneration;
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+static thread_local std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> t_frameStreamServers{nullptr};
+static thread_local uint64_t t_frameStreamServersGeneration;
+#endif /* HAVE_FSTRM */
+
 thread_local std::unique_ptr<MT_t> MT; // the big MTasker
 thread_local std::unique_ptr<MemRecursorCache> t_RC;
 thread_local std::unique_ptr<RecursorPacketCache> t_packetCache;
@@ -535,9 +540,6 @@ public:
   {
   }
 
-  typedef set<int> socks_t;
-  socks_t d_socks;
-
   // returning -2 means: temporary OS error (ie, out of files), -1 means error related to remote
   int getSocket(const ComboAddress& toaddr, int* fd)
   {
@@ -547,7 +549,6 @@ public:
 
     if(connect(*fd, (struct sockaddr*)(&toaddr), toaddr.getSocklen()) < 0) {
       int err = errno;
-      //      returnSocket(*fd);
       try {
         closesocket(*fd);
       }
@@ -560,43 +561,32 @@ public:
       return -1;
     }
 
-    d_socks.insert(*fd);
     d_numsocks++;
     return 0;
   }
 
-  void returnSocket(int fd)
-  {
-    socks_t::iterator i=d_socks.find(fd);
-    if(i==d_socks.end()) {
-      throw PDNSException("Trying to return a socket (fd="+std::to_string(fd)+") not in the pool");
-    }
-    returnSocketLocked(i);
-  }
-
   // return a socket to the pool, or simply erase it
-  void returnSocketLocked(socks_t::iterator& i)
+  void returnSocket(int fd)
   {
-    if(i==d_socks.end()) {
-      throw PDNSException("Trying to return a socket not in the pool");
-    }
     try {
-      t_fdm->removeReadFD(*i);
+      t_fdm->removeReadFD(fd);
     }
-    catch(FDMultiplexerException& e) {
+    catch(const FDMultiplexerException& e) {
       // we sometimes return a socket that has not yet been assigned to t_fdm
     }
+
     try {
-      closesocket(*i);
+      closesocket(fd);
     }
     catch(const PDNSException& e) {
       g_log<<Logger::Error<<"Error closing returned UDP socket: "<<e.reason<<endl;
     }
 
-    d_socks.erase(i++);
     --d_numsocks;
   }
 
+private:
+
   // returns -1 for errors which might go away, throws for ones that won't
   static int makeClientSocket(int family)
   {
@@ -629,11 +619,21 @@ public:
       if (::bind(ret, (struct sockaddr *)&sin, sin.getSocklen()) >= 0)
         break;
     }
-    if(!tries)
+
+    if(!tries) {
+      closesocket(ret);
       throw PDNSException("Resolver binding to local query client socket on "+sin.toString()+": "+stringerror());
+    }
+
+    try {
+      setReceiveSocketErrors(ret, family);
+      setNonBlocking(ret);
+    }
+    catch(...) {
+      closesocket(ret);
+      throw;
+    }
 
-    setReceiveSocketErrors(ret, family);
-    setNonBlocking(ret);
     return ret;
   }
 };
@@ -703,7 +703,9 @@ int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size
 
   int ret=MT->waitEvent(pident, &packet, g_networkTimeoutMsec, now);
 
+  /* -1 means error, 0 means timeout, 1 means a result from handleUDPServerResponse() which might still be an error */
   if(ret > 0) {
+    /* handleUDPServerResponse() will close the socket for us no matter what */
     if(packet.empty()) // means "error"
       return -1;
 
@@ -716,6 +718,7 @@ int arecvfrom(std::string& packet, int flags, const ComboAddress& fromaddr, size
     }
   }
   else {
+    /* getting there means error or timeout, it's up to us to close the socket */
     if(fd >= 0)
       t_udpclientsocks->returnSocket(fd);
   }
@@ -964,6 +967,73 @@ static bool checkOutgoingProtobufExport(LocalStateHolder<LuaConfigItems>& luacon
 
   return true;
 }
+
+#ifdef HAVE_FSTRM
+
+static std::shared_ptr<std::vector<std::unique_ptr<FrameStreamLogger>>> startFrameStreamServers(const FrameStreamExportConfig& config)
+{
+  auto result = std::make_shared<std::vector<std::unique_ptr<FrameStreamLogger>>>();
+
+  for (const auto& server : config.servers) {
+    try {
+      std::unordered_map<string,unsigned> options;
+      options["bufferHint"] = config.bufferHint;
+      options["flushTimeout"] = config.flushTimeout;
+      options["inputQueueSize"] = config.inputQueueSize;
+      options["outputQueueSize"] = config.outputQueueSize;
+      options["queueNotifyThreshold"] = config.queueNotifyThreshold;
+      options["reopenInterval"] = config.reopenInterval;
+      FrameStreamLogger *fsl = nullptr;
+      try {
+        ComboAddress address(server);
+        fsl = new FrameStreamLogger(address.sin4.sin_family, address.toStringWithPort(), true, options);
+      }
+      catch (const PDNSException& e) {
+        fsl = new FrameStreamLogger(AF_UNIX, server, true, options);
+      }
+      fsl->setLogQueries(config.logQueries);
+      fsl->setLogResponses(config.logResponses);
+      result->emplace_back(fsl);
+    }
+    catch(const std::exception& e) {
+      g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.what()<<endl;
+    }
+    catch(const PDNSException& e) {
+      g_log<<Logger::Error<<"Error while starting dnstap framestream logger to '"<<server<<": "<<e.reason<<endl;
+    }
+  }
+
+  return result;
+}
+
+static bool checkFrameStreamExport(LocalStateHolder<LuaConfigItems>& luaconfsLocal)
+{
+  if (!luaconfsLocal->frameStreamExportConfig.enabled) {
+    if (t_frameStreamServers) {
+      // dt's take care of cleanup
+      t_frameStreamServers.reset();
+    }
+
+    return false;
+  }
+
+  /* if the server was not running, or if it was running according to a
+     previous configuration */
+  if (!t_frameStreamServers ||
+      t_frameStreamServersGeneration < luaconfsLocal->generation) {
+
+    if (t_frameStreamServers) {
+      // dt's take care of cleanup
+      t_frameStreamServers.reset();
+    }
+
+    t_frameStreamServers = startFrameStreamServers(luaconfsLocal->frameStreamExportConfig);
+    t_frameStreamServersGeneration = luaconfsLocal->generation;
+  }
+
+  return true;
+}
+#endif /* HAVE_FSTRM */
 #endif /* HAVE_PROTOBUF */
 
 #ifdef NOD_ENABLED
@@ -1092,6 +1162,10 @@ static void startDoResolve(void *p)
     }
 #endif /* HAVE_PROTOBUF */
 
+#ifdef HAVE_FSTRM
+    checkFrameStreamExport(luaconfsLocal);
+#endif
+
     DNSPacketWriter pw(packet, dc->d_mdp.d_qname, dc->d_mdp.d_qtype, dc->d_mdp.d_qclass);
 
     pw.getHeader()->aa=0;
@@ -1147,7 +1221,9 @@ static void startDoResolve(void *p)
     sr.setInitialRequestId(dc->d_uuid);
     sr.setOutgoingProtobufServers(t_outgoingProtobufServers);
 #endif
-
+#ifdef HAVE_FSTRM
+    sr.setFrameStreamServers(t_frameStreamServers);
+#endif
     sr.setQuerySource(dc->d_remote, g_useIncomingECS && !dc->d_ednssubnet.source.empty() ? boost::optional<const EDNSSubnetOpts&>(dc->d_ednssubnet) : boost::none);
 
     bool tracedQuery=false; // we could consider letting Lua know about this too
@@ -1918,6 +1994,10 @@ static void handleRunningTCPQuestion(int fd, FDMultiplexer::funcparam_t& var)
         needECS = true;
       }
       logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
+#endif /* HAVE_PROTOBUF */
+
+#ifdef HAVE_FSTRM
+      checkFrameStreamExport(luaconfsLocal);
 #endif
 
       if(needECS || needXPF || (t_pdl && (t_pdl->d_gettag_ffi || t_pdl->d_gettag))) {
@@ -2112,6 +2192,9 @@ static string* doProcessUDPQuestion(const std::string& question, const ComboAddr
   }
   logQuery = t_protobufServers && luaconfsLocal->protobufExportConfig.logQueries;
   bool logResponse = t_protobufServers && luaconfsLocal->protobufExportConfig.logResponses;
+#endif
+#ifdef HAVE_FSTRM
+  checkFrameStreamExport(luaconfsLocal);
 #endif
   EDNSSubnetOpts ednssubnet;
   bool ecsFound = false;
@@ -2781,6 +2864,11 @@ static void houseKeeping(void *)
 
 static void makeThreadPipes()
 {
+  auto pipeBufferSize = ::arg().asNum("distribution-pipe-buffer-size");
+  if (pipeBufferSize > 0) {
+    g_log<<Logger::Info<<"Resizing the buffer of the distribution pipe to "<<pipeBufferSize<<endl;
+  }
+
   /* thread 0 is the handler / SNMP, we start at 1 */
   for(unsigned int n = 1; n <= (g_numWorkerThreads + g_numDistributorThreads); ++n) {
     auto& threadInfos = s_threadInfos.at(n);
@@ -2804,6 +2892,16 @@ static void makeThreadPipes()
     threadInfos.pipes.readQueriesToThread = fd[0];
     threadInfos.pipes.writeQueriesToThread = fd[1];
 
+    if (pipeBufferSize > 0) {
+      if (!setPipeBufferSize(threadInfos.pipes.writeQueriesToThread, pipeBufferSize)) {
+        g_log<<Logger::Warning<<"Error resizing the buffer of the distribution pipe for thread "<<n<<" to "<<pipeBufferSize<<": "<<strerror(errno)<<endl;
+        auto existingSize = getPipeBufferSize(threadInfos.pipes.writeQueriesToThread);
+        if (existingSize > 0) {
+          g_log<<Logger::Warning<<"The current size of the distribution pipe's buffer for thread "<<n<<" is "<<existingSize<<endl;
+        }
+      }
+    }
+
     if (!setNonBlocking(threadInfos.pipes.writeQueriesToThread)) {
       unixDie("Making pipe for inter-thread communications non-blocking");
     }
@@ -3227,6 +3325,8 @@ static void handleUDPServerResponse(int fd, FDMultiplexer::funcparam_t& var)
 retryWithName:
 
   if(!MT->sendEvent(pident, &packet)) {
+    /* we did not find a match for this response, something is wrong */
+
     // we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
     for(MT_t::waiters_t::iterator mthread=MT->d_waiters.begin(); mthread!=MT->d_waiters.end(); ++mthread) {
       if(pident.fd==mthread->key.fd && mthread->key.remote==pident.remote &&  mthread->key.type == pident.type &&
@@ -3249,6 +3349,7 @@ retryWithName:
     }
   }
   else if(fd >= 0) {
+    /* we either found a waiter (1) or encountered an issue (-1), it's up to us to clean the socket anyway */
     t_udpclientsocks->returnSocket(fd);
   }
 }
@@ -3745,6 +3846,8 @@ static int serviceMain(int argc, char*argv[])
   SyncRes::s_ecsipv6cachelimit = ::arg().asNum("ecs-ipv6-cache-bits");
   SyncRes::s_ecscachelimitttl = ::arg().asNum("ecs-cache-limit-ttl");
 
+  SyncRes::s_qnameminimization = ::arg().mustDo("qname-minimization");
+
   if (!::arg().isEmpty("ecs-scope-zero-address")) {
     ComboAddress scopeZero(::arg()["ecs-scope-zero-address"]);
     SyncRes::setECSScopeZeroAddress(Netmask(scopeZero, scopeZero.isIPv4() ? 32 : 128));
@@ -3919,10 +4022,10 @@ static int serviceMain(int argc, char*argv[])
 
   int newgid=0;
   if(!::arg()["setgid"].empty())
-    newgid=Utility::makeGidNumeric(::arg()["setgid"]);
+    newgid = strToGID(::arg()["setgid"]);
   int newuid=0;
   if(!::arg()["setuid"].empty())
-    newuid=Utility::makeUidNumeric(::arg()["setuid"]);
+    newuid = strToUID(::arg()["setuid"]);
 
   Utility::dropGroupPrivs(newuid, newgid);
 
@@ -4139,6 +4242,9 @@ try
   checkProtobufExport(luaconfsLocal);
   checkOutgoingProtobufExport(luaconfsLocal);
 #endif /* HAVE_PROTOBUF */
+#ifdef HAVE_FSTRM
+  checkFrameStreamExport(luaconfsLocal);
+#endif
 
   PacketID pident;
 
@@ -4335,7 +4441,7 @@ int main(int argc, char **argv)
     ::arg().set("socket-group","Group of socket")="";
     ::arg().set("socket-mode", "Permissions for socket")="";
 
-    ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+" when unset and not chrooted" )="";
+    ::arg().set("socket-dir",string("Where the controlsocket will live, ")+LOCALSTATEDIR+"/pdns-recursor when unset and not chrooted" )="";
     ::arg().set("delegation-only","Which domains we only accept delegations from")="";
     ::arg().set("query-local-address","Source IP address for sending queries")="0.0.0.0";
     ::arg().set("query-local-address6","Source IPv6 address for sending queries. IF UNSET, IPv6 WILL NOT BE USED FOR OUTGOING QUERIES")="";
@@ -4402,6 +4508,7 @@ int main(int argc, char **argv)
     ::arg().set("max-recursion-depth", "Maximum number of internal recursion calls per query, 0 for unlimited")="40";
     ::arg().set("max-udp-queries-per-round", "Maximum number of UDP queries processed per recvmsg() round, before returning back to normal processing")="10000";
     ::arg().set("protobuf-use-kernel-timestamp", "Compute the latency of queries in protobuf messages by using the timestamp set by the kernel when the query was received (when available)")="";
+    ::arg().set("distribution-pipe-buffer-size", "Size in bytes of the internal buffer of the pipe used by the distributor to pass incoming queries to a worker thread")="0";
 
     ::arg().set("include-dir","Include *.conf files from this directory")="";
     ::arg().set("security-poll-suffix","Domain name from which to query security update notifications")="secpoll.powerdns.com.";
@@ -4439,6 +4546,7 @@ int main(int argc, char **argv)
     ::arg().set("rng", "Specify random number generator to use. Valid values are auto,sodium,openssl,getrandom,arc4random,urandom.")="auto";
     ::arg().set("public-suffix-list-file", "Path to the Public Suffix List file, if any")="";
     ::arg().set("distribution-load-factor", "The load factor used when PowerDNS is distributing queries to worker threads")="0.0";
+    ::arg().setSwitch("qname-minimization", "Use Query Name Minimization")="no";
 #ifdef NOD_ENABLED
     ::arg().set("new-domain-tracking", "Track newly observed domains (i.e. never seen before).")="no";
     ::arg().set("new-domain-log", "Log newly observed domains.")="yes";
@@ -4488,7 +4596,7 @@ int main(int argc, char **argv)
 
     if (::arg()["socket-dir"].empty()) {
       if (::arg()["chroot"].empty())
-        ::arg().set("socket-dir") = LOCALSTATEDIR;
+        ::arg().set("socket-dir") = std::string(LOCALSTATEDIR) + "/pdns-recursor";
       else
         ::arg().set("socket-dir") = "/";
     }