return ret;
}
+ if (*chained) {
+ auto msec = lwr->d_usec / 1000;
+ if (msec > g_networkTimeoutMsec * 2 / 3) {
+ auto jitterMsec = dns_random(msec);
+ if (jitterMsec > 0) {
+ mthreadSleep(jitterMsec);
+ }
+ }
+ }
+
buf.resize(len);
#ifdef HAVE_FSTRM
}
using tfunc_t = void(void*); //!< type of the pointer that starts a thread
+ uint64_t nextWaiterDelayUsec(uint64_t defusecs);
int waitEvent(EventKey& key, EventVal* val = nullptr, unsigned int timeoutMsec = 0, const struct timeval* now = nullptr);
void yield();
int sendEvent(const EventKey& key, const EventVal* val = nullptr);
#include <valgrind/valgrind.h>
#endif /* PDNS_USE_VALGRIND */
+template<class EventKey, class EventVal, class Cmp>
+uint64_t MTasker<EventKey,EventVal,Cmp>::nextWaiterDelayUsec(uint64_t defusecs)
+{
+ if (d_waiters.empty()) {
+ // no waiters
+ return defusecs;
+ }
+ auto& ttdindex = boost::multi_index::get<KeyTag>(d_waiters);
+ auto iter = ttdindex.begin();
+ timeval rnow{};
+ gettimeofday(&rnow, nullptr);
+ if (iter->ttd.tv_sec != 0) {
+ // we have a waiter with a timeout specified
+ if (rnow < iter->ttd) {
+ // we should not wait longer than the default timeout
+ return std::min(defusecs, uSec(iter->ttd - rnow));
+ }
+ // already expired
+ return 0;
+ }
+ return defusecs;
+}
+
//! puts a thread to sleep waiting until a specified event arrives
/** Threads can call waitEvent to register that they are waiting on an event with a certain key.
If so desired, the event can carry data which is returned in val in case that is non-zero.
thread_local std::unique_ptr<UDPClientSocks> t_udpclientsocks;
// If we have plenty of mthreads slot left, use default timeout.
-// Othwerwise reduce the timeout to be between g_networkTimeoutMsec/10 and g_networkTimeoutMsec
+// Otherwise reduce the timeout to be between g_networkTimeoutMsec/10 and g_networkTimeoutMsec
unsigned int authWaitTimeMSec(const std::unique_ptr<MT_t>& mtasker)
{
const auto max = g_maxMThreads;
t_Counters.at(rec::Counter::maxChainWeight) = weight;
}
- for (auto qid: iter->key->authReqChain) {
+ for (auto qid : iter->key->authReqChain) {
auto packetID = std::make_shared<PacketID>(*resend);
packetID->fd = -1;
packetID->id = qid;
}
}
+void mthreadSleep(unsigned int jitterMsec)
+{
+ auto neverHappens = std::make_shared<PacketID>();
+ neverHappens->id = dns_random_uint16();
+ neverHappens->type = dns_random_uint16();
+ neverHappens->remote = ComboAddress("100::"); // discard-only
+ neverHappens->remote.setPort(dns_random_uint16());
+ neverHappens->fd = -1;
+ assert(g_multiTasker->waitEvent(neverHappens, nullptr, jitterMsec) != -1); // NOLINT
+}
+
+
static void handleUDPServerResponse(int fileDesc, FDMultiplexer::funcparam_t& var)
{
auto pid = boost::any_cast<std::shared_ptr<PacketID>>(var);
}
runLuaMaintenance(threadInfo, last_lua_maintenance, luaMaintenanceInterval);
- t_fdm->run(&g_now);
+ auto timeoutUsec = g_multiTasker->nextWaiterDelayUsec(500000);
+ t_fdm->run(&g_now, static_cast<int>(timeoutUsec / 1000));
// 'run' updates g_now for us
runTCPMaintenance(threadInfo, listenOnTCP, maxTcpClients);
/* external functions, opaque to us */
LWResult::Result asendtcp(const PacketBuffer& data, shared_ptr<TCPIOHandler>&);
LWResult::Result arecvtcp(PacketBuffer& data, size_t len, shared_ptr<TCPIOHandler>&, bool incompleteOkay);
+void mthreadSleep(unsigned int jitterMsec);
enum TCPAction : uint8_t
{