#include "misc.hh"
#include "mtasker_context.hh"
-using namespace ::boost::multi_index;
-
// #define MTASKERTIMING 1
//! The main MTasker class
template <class EventKey = int, class EventVal = int, class Cmp = std::less<EventKey>>
class MTasker
{
+public:
+ struct Waiter
+ {
+ EventKey key;
+ std::shared_ptr<pdns_ucontext_t> context;
+ struct timeval ttd
+ {
+ };
+ int tid{};
+ };
+ struct KeyTag
+ {
+ };
+
+ using waiters_t = boost::multi_index::multi_index_container<
+ Waiter,
+ boost::multi_index::indexed_by<
+ boost::multi_index::ordered_unique<boost::multi_index::member<Waiter, EventKey, &Waiter::key>, Cmp>,
+ boost::multi_index::ordered_non_unique<boost::multi_index::tag<KeyTag>, boost::multi_index::member<Waiter, struct timeval, &Waiter::ttd>>>>;
+
+ //! Constructor
+ /** Constructor with a small default stacksize. If any of your threads exceeds this stack, your application will crash.
+ This limit applies solely to the stack, the heap is not limited in any way. If threads need to allocate a lot of data,
+ the use of new/delete is suggested.
+ */
+ MTasker(size_t stacksize = static_cast<size_t>(16 * 8192), size_t stackCacheSize = 0) :
+ d_stacksize(stacksize), d_maxCachedStacks(stackCacheSize), d_waitstatus(Error)
+ {
+ initMainStackBounds();
+
+ // make sure our stack is 16-byte aligned to make all the architectures happy
+ d_stacksize = d_stacksize >> 4 << 4;
+ }
+
+ using tfunc_t = void(void*); //!< type of the pointer that starts a thread
+ int waitEvent(EventKey& key, EventVal* val = nullptr, unsigned int timeoutMsec = 0, const struct timeval* now = nullptr);
+ void yield();
+ int sendEvent(const EventKey& key, const EventVal* val = nullptr);
+ void makeThread(tfunc_t* start, void* val);
+ bool schedule(const struct timeval* now = nullptr);
+
+ const waiters_t& getWaiters() const
+ {
+ return d_waiters;
+ }
+
+ //! gives access to the list of Events threads are waiting for
+ /** The kernel can call this to get a list of Events threads are waiting for. This is very useful
+ to setup 'select' or 'poll' or 'aio' events needed to satisfy these requests.
+ getEvents clears the events parameter before filling it.
+
+ \param events Vector which is to be filled with keys threads are waiting for
+ */
+ void getEvents(std::vector<EventKey>& events) const
+ {
+ events.clear();
+ for (const auto& waiter : d_waiters) {
+ events.emplace_back(waiter.key);
+ }
+ }
+
+ //! returns true if there are no processes
+ /** Call this to check if no processes are running anymore
+ \return true if no processes are left
+ */
+ [[nodiscard]] bool noProcesses() const
+ {
+ return d_threadsCount == 0;
+ }
+
+ //! returns the number of processes running
+ /** Call this to perhaps limit activities if too many threads are running
+ \return number of processes running
+ */
+ [[nodiscard]] unsigned int numProcesses() const
+ {
+ return d_threadsCount;
+ }
+
+ //! Returns the current Thread ID (tid)
+ /** Processes can call this to get a numerical representation of their current thread ID.
+ This can be useful for logging purposes.
+ */
+ [[nodiscard]] int getTid() const
+ {
+ return d_tid;
+ }
+
+ //! Returns the maximum stack usage so far of this MThread
+ [[nodiscard]] uint64_t getMaxStackUsage() const
+ {
+ return d_threads.at(d_tid).startOfStack - d_threads.at(d_tid).highestStackSeen;
+ }
+
+ //! Returns the maximum stack usage so far of this MThread
+ [[nodiscard]] unsigned int getUsec() const
+ {
+#ifdef MTASKERTIMING
+ return d_threads.at(d_tid).totTime + d_threads.at(d_tid).dt.ndiff() / 1000;
+#else
+ return 0;
+#endif
+ }
+
private:
+ EventKey d_eventkey; // for waitEvent, contains exact key it was awoken for
+ EventVal d_waitval;
+
pdns_ucontext_t d_kernel;
std::queue<int> d_runQueue;
std::queue<int> d_zombiesQueue;
mthreads_t d_threads;
std::stack<pdns_mtasker_stack_t> d_cachedStacks;
+ waiters_t d_waiters;
size_t d_stacksize;
size_t d_threadsCount{0};
size_t d_maxCachedStacks{0};
int d_tid{0};
int d_maxtid{0};
- EventVal d_waitval;
enum waitstatusenum : int8_t
{
Error = -1,
Answer
} d_waitstatus;
-public:
- struct Waiter
- {
- EventKey key;
- std::shared_ptr<pdns_ucontext_t> context;
- struct timeval ttd{};
- int tid{};
- };
- struct KeyTag
- {
- };
-
- using waiters_t = multi_index_container<
- Waiter,
- indexed_by<
- ordered_unique<member<Waiter, EventKey, &Waiter::key>, Cmp>,
- ordered_non_unique<tag<KeyTag>, member<Waiter, struct timeval, &Waiter::ttd>>>>;
-
- waiters_t d_waiters;
+ std::shared_ptr<pdns_ucontext_t> getUContext();
void initMainStackBounds()
{
#endif /* HAVE_FIBER_SANITIZER */
}
-
- //! Constructor
- /** Constructor with a small default stacksize. If any of your threads exceeds this stack, your application will crash.
- This limit applies solely to the stack, the heap is not limited in any way. If threads need to allocate a lot of data,
- the use of new/delete is suggested.
- */
- MTasker(size_t stacksize = static_cast<size_t>(16 * 8192), size_t stackCacheSize = 0) :
- d_stacksize(stacksize), d_maxCachedStacks(stackCacheSize), d_waitstatus(Error)
- {
- initMainStackBounds();
-
- // make sure our stack is 16-byte aligned to make all the architectures happy
- d_stacksize = d_stacksize >> 4 << 4;
- }
-
- using tfunc_t = void (void *); //!< type of the pointer that starts a thread
- int waitEvent(EventKey& key, EventVal* val = nullptr, unsigned int timeoutMsec = 0, const struct timeval* now = nullptr);
- void yield();
- int sendEvent(const EventKey& key, const EventVal* val = nullptr);
- void getEvents(std::vector<EventKey>& events);
- void makeThread(tfunc_t* start, void* val);
- bool schedule(const struct timeval* now = nullptr);
- [[nodiscard]] bool noProcesses() const;
- [[nodiscard]] unsigned int numProcesses() const;
- [[nodiscard]] int getTid() const;
- uint64_t getMaxStackUsage();
- unsigned int getUsec();
-
-private:
- std::shared_ptr<pdns_ucontext_t> getUContext();
-
- EventKey d_eventkey; // for waitEvent, contains exact key it was awoken for
};
#ifdef PDNS_USE_VALGRIND
#include <valgrind/valgrind.h>
#endif /* PDNS_USE_VALGRIND */
-/** \page MTasker
- Simple system for implementing cooperative multitasking of functions, with
- support for waiting on events which can return values.
-
- \section copyright Copyright and License
- MTasker is (c) 2002 - 2009 by bert hubert. It is licensed to you under the terms of the GPL version 2.
-
- \section overview High level overview
- MTasker is designed to support very simple cooperative multitasking to facilitate writing
- code that would ordinarily require a statemachine, for which the author does not consider
- himself smart enough.
-
- This class does not perform any magic it only makes calls to makecontext() and swapcontext().
- Getting the details right however is complicated and MTasker does that for you.
-
- If preemptive multitasking or more advanced concepts such as semaphores, locks or mutexes
- are required, the use of POSIX threads is advised.
-
- MTasker is designed to offer the performance of statemachines while maintaining simple thread semantics. It is not
- a replacement for a full threading system.
-
- \section compatibility Compatibility
- MTasker is only guaranteed to work on Linux with glibc 2.2.5 and higher. It does not work on FreeBSD and notably,
- not on Red Hat 6.0. It may work on Solaris, please test.
-
- \section concepts Concepts
-
- There are two important concepts, the 'kernel' and the 'thread'. Each thread starts out as a function,
- which is passed to MTasker::makeThread(), together with a possible argument.
-
- This function is now free to do whatever it wants, but realise that MTasker implements cooperative
- multitasking, which means that the coder has the responsibility of not taking the CPU overly long.
- Other threads can only get the CPU if MTasker::yield() is called or if a thread sleeps to wait for an event,
- using the MTasker::waitEvent() method.
-
- \section kernel The Kernel
- The Kernel consists of functions that do housekeeping, but also of code that the client coder
- can call to report events. A minimal kernel loop looks like this:
- \code
- for(;;) {
- MT.schedule();
- if(MT.noProcesses()) // exit if no processes are left
- break;
- }
- \endcode
-
- The kernel typically starts from the main() function of your program. New threads are also
- created from the kernel. This can also happen before entering the main loop. To start a thread,
- the method MTasker::makeThread is provided.
-
- \section events Events
- By default, Events are recognized by an int and their value is also an int.
- This can be overridden by specifying the EventKey and EventVal template parameters.
-
- An event can be a keypress, but also a UDP packet, or a bit of data from a TCP socket. The
- sample code provided works with keypresses, but that is just a not very useful example.
-
- A thread can also wait for an event only for a limited time, and receive a timeout of that
- event did not occur within the specified timeframe.
-
- \section example A simple menu system
- \code
-MTasker<> MT;
-
-void menuHandler(void *p)
-{
- int num=(int)p;
- cout<<"Key handler for key "<<num<<" launched"<<endl;
-
- MT.waitEvent(num);
- cout<<"Key "<<num<<" was pressed!"<<endl;
-}
-
-
-int main()
-{
- char line[10];
-
- for(int i=0;i<10;++i)
- MT.makeThread(menuHandler,(void *)i);
-
- for(;;) {
- while(MT.schedule()); // do everything we can do
- if(MT.noProcesses()) // exit if no processes are left
- break;
-
- if(!fgets(line,sizeof(line),stdin))
- break;
-
- MT.sendEvent(*line-'0');
- }
-}
-\endcode
-
-\section example2 Canonical multitasking example
-This implements the canonical multitasking example, alternately printing an 'A' and a 'B'. The Linux kernel
- started this way too.
-\code
-void printer(void *p)
-{
- char c=(char)p;
- for(;;) {
- cout<<c<<endl;
- MT.yield();
- }
-
-}
-
-int main()
-{
- MT.makeThread(printer,(void*)'a');
- MT.makeThread(printer,(void*)'b');
-
- for(;;) {
- while(MT.schedule()); // do everything we can do
- if(MT.noProcesses()) // exit if no processes are left
- break;
- }
-}
-\endcode
-
-*/
-
//! puts a thread to sleep waiting until a specified event arrives
/** Threads can call waitEvent to register that they are waiting on an event with a certain key.
If so desired, the event can carry data which is returned in val in case that is non-zero.
\return returns -1 in case of error, 0 in case of timeout, 1 in case of an answer
*/
-
template <class EventKey, class EventVal, class Cmp>
int MTasker<EventKey, EventVal, Cmp>::waitEvent(EventKey& key, EventVal* val, unsigned int timeoutMsec, const struct timeval* now)
{
waiter.ttd.tv_sec = 0;
waiter.ttd.tv_usec = 0;
if (timeoutMsec != 0) {
- struct timeval increment{};
+ struct timeval increment
+ {
+ };
increment.tv_sec = timeoutMsec / 1000;
increment.tv_usec = static_cast<decltype(increment.tv_usec)>(1000 * (timeoutMsec % 1000));
if (now != nullptr) {
waiter.ttd = increment + *now;
}
else {
- struct timeval realnow{};
+ struct timeval realnow
+ {
+ };
gettimeofday(&realnow, nullptr);
waiter.ttd = increment + realnow;
}
return true;
}
if (!d_waiters.empty()) {
- struct timeval rnow{};
+ struct timeval rnow
+ {
+ };
if (now != nullptr) {
gettimeofday(&rnow, nullptr);
}
}
return false;
}
-
-//! returns true if there are no processes
-/** Call this to check if no processes are running anymore
- \return true if no processes are left
- */
-template <class Key, class Val, class Cmp>
-bool MTasker<Key, Val, Cmp>::noProcesses() const
-{
- return d_threadsCount == 0;
-}
-
-//! returns the number of processes running
-/** Call this to perhaps limit activities if too many threads are running
- \return number of processes running
- */
-template <class Key, class Val, class Cmp>
-unsigned int MTasker<Key, Val, Cmp>::numProcesses() const
-{
- return d_threadsCount;
-}
-
-//! gives access to the list of Events threads are waiting for
-/** The kernel can call this to get a list of Events threads are waiting for. This is very useful
- to setup 'select' or 'poll' or 'aio' events needed to satisfy these requests.
- getEvents clears the events parameter before filling it.
-
- \param events Vector which is to be filled with keys threads are waiting for
-*/
-template <class Key, class Val, class Cmp>
-void MTasker<Key, Val, Cmp>::getEvents(std::vector<Key>& events)
-{
- events.clear();
- for (typename waiters_t::const_iterator i = d_waiters.begin(); i != d_waiters.end(); ++i) {
- events.push_back(i->first);
- }
-}
-
-//! Returns the current Thread ID (tid)
-/** Processes can call this to get a numerical representation of their current thread ID.
- This can be useful for logging purposes.
-*/
-template <class Key, class Val, class Cmp>
-int MTasker<Key, Val, Cmp>::getTid() const
-{
- return d_tid;
-}
-
-//! Returns the maximum stack usage so far of this MThread
-template <class Key, class Val, class Cmp>
-uint64_t MTasker<Key, Val, Cmp>::getMaxStackUsage()
-{
- return d_threads[d_tid].startOfStack - d_threads[d_tid].highestStackSeen;
-}
-
-//! Returns the maximum stack usage so far of this MThread
-template <class Key, class Val, class Cmp>
-unsigned int MTasker<Key, Val, Cmp>::getUsec()
-{
-#ifdef MTASKERTIMING
- return d_threads[d_tid].totTime + d_threads[d_tid].dt.ndiff() / 1000;
-#else
- return 0;
-#endif
-}
-
// See if there is an existing outstanding request we can chain on to, using partial equivalence
// function looking for the same query (qname and qtype) to the same host, but with a different
// message ID.
- auto chain = g_multiTasker->d_waiters.equal_range(pident, PacketIDBirthdayCompare());
+ auto chain = g_multiTasker->getWaiters().equal_range(pident, PacketIDBirthdayCompare());
for (; chain.first != chain.second; chain.first++) {
// Line below detected an issue with the two ways of ordering PacketIDs (birthday and non-birthday)
t_udpclientsocks->returnSocket(fileDesc);
PacketBuffer empty;
- MT_t::waiters_t::iterator iter = g_multiTasker->d_waiters.find(pid);
- if (iter != g_multiTasker->d_waiters.end()) {
+ auto iter = g_multiTasker->getWaiters().find(pid);
+ if (iter != g_multiTasker->getWaiters().end()) {
doResends(iter, pid, empty);
}
g_multiTasker->sendEvent(pid, &empty); // this denotes error (does retry lookup using other NS)
}
if (!pident->domain.empty()) {
- MT_t::waiters_t::iterator iter = g_multiTasker->d_waiters.find(pident);
- if (iter != g_multiTasker->d_waiters.end()) {
+ auto iter = g_multiTasker->getWaiters().find(pident);
+ if (iter != g_multiTasker->getWaiters().end()) {
doResends(iter, pident, packet);
}
}
/* we did not find a match for this response, something is wrong */
// we do a full scan for outstanding queries on unexpected answers. not too bad since we only accept them on the right port number, which is hard enough to guess
- for (const auto& d_waiter : g_multiTasker->d_waiters) {
+ for (const auto& d_waiter : g_multiTasker->getWaiters()) {
if (pident->fd == d_waiter.key->fd && d_waiter.key->remote == pident->remote && d_waiter.key->type == pident->type && pident->domain == d_waiter.key->domain) {
/* we are expecting an answer from that exact source, on that exact port (since we are using connected sockets), for that qname/qtype,
but with a different message ID. That smells like a spoofing attempt. For now we will just increase the counter and will deal with
}
t_Counters.at(rec::Counter::unexpectedCount)++; // if we made it here, it really is an unexpected answer
if (g_logCommonErrors) {
- SLOG(g_log << Logger::Warning << "Discarding unexpected packet from " << fromaddr.toStringWithPort() << ": " << (pident->domain.empty() ? "<empty>" : pident->domain.toString()) << ", " << pident->type << ", " << g_multiTasker->d_waiters.size() << " waiters" << endl,
+ SLOG(g_log << Logger::Warning << "Discarding unexpected packet from " << fromaddr.toStringWithPort() << ": " << (pident->domain.empty() ? "<empty>" : pident->domain.toString()) << ", " << pident->type << ", " << g_multiTasker->getWaiters().size() << " waiters" << endl,
g_slogudpin->info(Logr::Warning, "Discarding unexpected packet", "from", Logging::Loggable(fromaddr),
"qname", Logging::Loggable(pident->domain),
"qtype", Logging::Loggable(QType(pident->type)),
- "waiters", Logging::Loggable(g_multiTasker->d_waiters.size())));
+ "waiters", Logging::Loggable(g_multiTasker->getWaiters().size())));
}
}
else if (fileDesc >= 0) {