#endif /* PDNS_USE_VALGRIND */
/** \page MTasker
- Simple system for implementing cooperative multitasking of functions, with
+ Simple system for implementing cooperative multitasking of functions, with
support for waiting on events which can return values.
\section copyright Copyright and License
MTasker is (c) 2002 - 2009 by bert hubert. It is licensed to you under the terms of the GPL version 2.
\section overview High level overview
- MTasker is designed to support very simple cooperative multitasking to facilitate writing
- code that would ordinarily require a statemachine, for which the author does not consider
+ MTasker is designed to support very simple cooperative multitasking to facilitate writing
+ code that would ordinarily require a statemachine, for which the author does not consider
himself smart enough.
- This class does not perform any magic it only makes calls to makecontext() and swapcontext().
+ This class does not perform any magic it only makes calls to makecontext() and swapcontext().
Getting the details right however is complicated and MTasker does that for you.
If preemptive multitasking or more advanced concepts such as semaphores, locks or mutexes
This function is now free to do whatever it wants, but realise that MTasker implements cooperative
multitasking, which means that the coder has the responsibility of not taking the CPU overly long.
- Other threads can only get the CPU if MTasker::yield() is called or if a thread sleeps to wait for an event,
+ Other threads can only get the CPU if MTasker::yield() is called or if a thread sleeps to wait for an event,
using the MTasker::waitEvent() method.
\section kernel The Kernel
- The Kernel consists of functions that do housekeeping, but also of code that the client coder
+ The Kernel consists of functions that do housekeeping, but also of code that the client coder
can call to report events. A minimal kernel loop looks like this:
\code
for(;;) {
\section events Events
By default, Events are recognized by an int and their value is also an int.
This can be overridden by specifying the EventKey and EventVal template parameters.
-
+
An event can be a keypress, but also a UDP packet, or a bit of data from a TCP socket. The
sample code provided works with keypresses, but that is just a not very useful example.
- A thread can also wait for an event only for a limited time, and receive a timeout of that
+ A thread can also wait for an event only for a limited time, and receive a timeout of that
event did not occur within the specified timeframe.
\section example A simple menu system
{
int num=(int)p;
cout<<"Key handler for key "<<num<<" launched"<<endl;
-
+
MT.waitEvent(num);
cout<<"Key "<<num<<" was pressed!"<<endl;
}
{
char line[10];
- for(int i=0;i<10;++i)
+ for(int i=0;i<10;++i)
MT.makeThread(menuHandler,(void *)i);
-
+
for(;;) {
while(MT.schedule()); // do everything we can do
if(MT.noProcesses()) // exit if no processes are left
if(!fgets(line,sizeof(line),stdin))
break;
-
+
MT.sendEvent(*line-'0');
}
}
//! puts a thread to sleep waiting until a specified event arrives
/** Threads can call waitEvent to register that they are waiting on an event with a certain key.
If so desired, the event can carry data which is returned in val in case that is non-zero.
-
+
Furthermore, a timeout can be specified in seconds.
-
+
Only one thread can be waiting on a key, results of trying to have more threads
waiting on the same key are undefined.
-
+
\param key Event key to wait for. Needs to match up to a key reported to sendEvent
\param val If non-zero, the value of the event will be stored in *val
\param timeout If non-zero, number of seconds to wait for an event.
-
- \return returns -1 in case of error, 0 in case of timeout, 1 in case of an answer
+
+ \return returns -1 in case of error, 0 in case of timeout, 1 in case of an answer
*/
-template<class EventKey, class EventVal, class Cmp>int MTasker<EventKey,EventVal,Cmp>::waitEvent(EventKey &key, EventVal *val, unsigned int timeoutMsec, const struct timeval* now)
+template <class EventKey, class EventVal, class Cmp>
+int MTasker<EventKey, EventVal, Cmp>::waitEvent(EventKey& key, EventVal* val, unsigned int timeoutMsec, const struct timeval* now)
{
- if(d_waiters.count(key)) { // there was already an exact same waiter
+ if (d_waiters.count(key)) { // there was already an exact same waiter
return -1;
}
Waiter w;
- w.context=std::make_shared<pdns_ucontext_t>();
- w.ttd.tv_sec = 0; w.ttd.tv_usec = 0;
- if(timeoutMsec) {
+ w.context = std::make_shared<pdns_ucontext_t>();
+ w.ttd.tv_sec = 0;
+ w.ttd.tv_usec = 0;
+ if (timeoutMsec) {
struct timeval increment;
increment.tv_sec = timeoutMsec / 1000;
increment.tv_usec = 1000 * (timeoutMsec % 1000);
- if(now)
+ if (now)
w.ttd = increment + *now;
else {
struct timeval realnow;
}
}
- w.tid=d_tid;
- w.key=key;
+ w.tid = d_tid;
+ w.key = key;
d_waiters.insert(w);
#ifdef MTASKERTIMING
- unsigned int diff=d_threads[d_tid].dt.ndiff()/1000;
- d_threads[d_tid].totTime+=diff;
+ unsigned int diff = d_threads[d_tid].dt.ndiff() / 1000;
+ d_threads[d_tid].totTime += diff;
#endif
notifyStackSwitchToKernel();
- pdns_swapcontext(*d_waiters.find(key)->context,d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first
+ pdns_swapcontext(*d_waiters.find(key)->context, d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first
notifyStackSwitchDone();
#ifdef MTASKERTIMING
d_threads[d_tid].dt.start();
#endif
- if(val && d_waitstatus==Answer)
- *val=d_waitval;
- d_tid=w.tid;
- if((char*)&w < d_threads[d_tid].highestStackSeen) {
+ if (val && d_waitstatus == Answer)
+ *val = d_waitval;
+ d_tid = w.tid;
+ if ((char*)&w < d_threads[d_tid].highestStackSeen) {
d_threads[d_tid].highestStackSeen = (char*)&w;
}
- key=d_eventkey;
+ key = d_eventkey;
return d_waitstatus;
}
//! yields control to the kernel or other threads
/** Hands over control to the kernel, allowing other processes to run, or events to arrive */
-template<class Key, class Val, class Cmp>void MTasker<Key,Val,Cmp>::yield()
+template <class Key, class Val, class Cmp>
+void MTasker<Key, Val, Cmp>::yield()
{
d_runQueue.push(d_tid);
notifyStackSwitchToKernel();
- pdns_swapcontext(*d_threads[d_tid].context ,d_kernel); // give control to the kernel
+ pdns_swapcontext(*d_threads[d_tid].context, d_kernel); // give control to the kernel
notifyStackSwitchDone();
}
WARNING: when passing val as zero, d_waitval is undefined, and hence waitEvent will return undefined!
*/
-template<class EventKey, class EventVal, class Cmp>int MTasker<EventKey,EventVal,Cmp>::sendEvent(const EventKey& key, const EventVal* val)
+template <class EventKey, class EventVal, class Cmp>
+int MTasker<EventKey, EventVal, Cmp>::sendEvent(const EventKey& key, const EventVal* val)
{
- typename waiters_t::iterator waiter=d_waiters.find(key);
+ typename waiters_t::iterator waiter = d_waiters.find(key);
- if(waiter == d_waiters.end()) {
- //cerr<<"Event sent nobody was waiting for! " <<key << endl;
+ if (waiter == d_waiters.end()) {
+ // cerr<<"Event sent nobody was waiting for! " <<key << endl;
return 0;
}
- d_waitstatus=Answer;
- if(val)
- d_waitval=*val;
-
- d_tid=waiter->tid; // set tid
- d_eventkey=waiter->key; // pass waitEvent the exact key it was woken for
- auto userspace=std::move(waiter->context);
- d_waiters.erase(waiter); // removes the waitpoint
+ d_waitstatus = Answer;
+ if (val)
+ d_waitval = *val;
+
+ d_tid = waiter->tid; // set tid
+ d_eventkey = waiter->key; // pass waitEvent the exact key it was woken for
+ auto userspace = std::move(waiter->context);
+ d_waiters.erase(waiter); // removes the waitpoint
notifyStackSwitch(d_threads[d_tid].startOfStack, d_stacksize);
try {
- pdns_swapcontext(d_kernel,*userspace); // swaps back to the above point 'A'
+ pdns_swapcontext(d_kernel, *userspace); // swaps back to the above point 'A'
}
catch (...) {
notifyStackSwitchDone();
return 1;
}
-template<class Key, class Val, class Cmp> std::shared_ptr<pdns_ucontext_t> MTasker<Key,Val,Cmp>::getUContext()
+template <class Key, class Val, class Cmp>
+std::shared_ptr<pdns_ucontext_t> MTasker<Key, Val, Cmp>::getUContext()
{
auto uc = std::make_shared<pdns_ucontext_t>();
if (d_cachedStacks.empty()) {
#ifdef PDNS_USE_VALGRIND
uc->valgrind_id = VALGRIND_STACK_REGISTER(&uc->uc_stack[0],
- &uc->uc_stack[uc->uc_stack.size()-1]);
+ &uc->uc_stack[uc->uc_stack.size() - 1]);
#endif /* PDNS_USE_VALGRIND */
return uc;
\param start Pointer to the function which will form the start of the thread
\param val A void pointer that can be used to pass data to the thread
*/
-template<class Key, class Val, class Cmp>void MTasker<Key,Val,Cmp>::makeThread(tfunc_t *start, void* val)
+template <class Key, class Val, class Cmp>
+void MTasker<Key, Val, Cmp>::makeThread(tfunc_t* start, void* val)
{
auto uc = getUContext();
auto mt = this;
// we will get a better approximation when the task is executed, but that prevents notifying a stack at nullptr
// on the first invocation
- d_threads[d_maxtid].startOfStack = &uc->uc_stack[uc->uc_stack.size()-1];
+ d_threads[d_maxtid].startOfStack = &uc->uc_stack[uc->uc_stack.size() - 1];
thread.start = [start, val, mt]() {
- char dummy;
- mt->d_threads[mt->d_tid].startOfStack = mt->d_threads[mt->d_tid].highestStackSeen = &dummy;
- auto const tid = mt->d_tid;
- start (val);
- mt->d_zombiesQueue.push(tid);
+ char dummy;
+ mt->d_threads[mt->d_tid].startOfStack = mt->d_threads[mt->d_tid].highestStackSeen = &dummy;
+ auto const tid = mt->d_tid;
+ start(val);
+ mt->d_zombiesQueue.push(tid);
};
- pdns_makecontext (*uc, thread.start);
+ pdns_makecontext(*uc, thread.start);
thread.context = std::move(uc);
d_runQueue.push(d_maxtid++); // will run at next schedule invocation
}
-
//! needs to be called periodically so threads can run and housekeeping can be performed
/** The kernel should call this function every once in a while. It makes sense
to call this function if you:
- reported an event
- called makeThread
- - want to have threads running waitEvent() to get a timeout if enough time passed
-
+ - want to have threads running waitEvent() to get a timeout if enough time passed
+
\return Returns if there is more work scheduled and recalling schedule now would be useful
-
+
*/
-template<class Key, class Val, class Cmp>bool MTasker<Key,Val,Cmp>::schedule(const struct timeval* now)
+template <class Key, class Val, class Cmp>
+bool MTasker<Key, Val, Cmp>::schedule(const struct timeval* now)
{
- if(!d_runQueue.empty()) {
- d_tid=d_runQueue.front();
+ if (!d_runQueue.empty()) {
+ d_tid = d_runQueue.front();
#ifdef MTASKERTIMING
d_threads[d_tid].dt.start();
#endif
d_zombiesQueue.pop();
return true;
}
- if(!d_waiters.empty()) {
+ if (!d_waiters.empty()) {
struct timeval rnow;
- if(!now)
+ if (!now)
gettimeofday(&rnow, 0);
else
rnow = *now;
typedef typename waiters_t::template index<KeyTag>::type waiters_by_ttd_index_t;
// waiters_by_ttd_index_t& ttdindex=d_waiters.template get<KeyTag>();
- waiters_by_ttd_index_t& ttdindex=boost::multi_index::get<KeyTag>(d_waiters);
+ waiters_by_ttd_index_t& ttdindex = boost::multi_index::get<KeyTag>(d_waiters);
- for(typename waiters_by_ttd_index_t::iterator i=ttdindex.begin(); i != ttdindex.end(); ) {
- if(i->ttd.tv_sec && i->ttd < rnow) {
- d_waitstatus=TimeOut;
- d_eventkey=i->key; // pass waitEvent the exact key it was woken for
+ for (typename waiters_by_ttd_index_t::iterator i = ttdindex.begin(); i != ttdindex.end();) {
+ if (i->ttd.tv_sec && i->ttd < rnow) {
+ d_waitstatus = TimeOut;
+ d_eventkey = i->key; // pass waitEvent the exact key it was woken for
auto uc = i->context;
d_tid = i->tid;
- ttdindex.erase(i++); // removes the waitpoint
+ ttdindex.erase(i++); // removes the waitpoint
notifyStackSwitch(d_threads[d_tid].startOfStack, d_stacksize);
try {
}
notifyStackSwitchDone();
}
- else if(i->ttd.tv_sec)
+ else if (i->ttd.tv_sec)
break;
else
- ++i;
+ ++i;
}
}
return false;
/** Call this to check if no processes are running anymore
\return true if no processes are left
*/
-template<class Key, class Val, class Cmp>bool MTasker<Key,Val,Cmp>::noProcesses() const
+template <class Key, class Val, class Cmp>
+bool MTasker<Key, Val, Cmp>::noProcesses() const
{
return d_threadsCount == 0;
}
/** Call this to perhaps limit activities if too many threads are running
\return number of processes running
*/
-template<class Key, class Val, class Cmp>unsigned int MTasker<Key,Val,Cmp>::numProcesses() const
+template <class Key, class Val, class Cmp>
+unsigned int MTasker<Key, Val, Cmp>::numProcesses() const
{
return d_threadsCount;
}
\param events Vector which is to be filled with keys threads are waiting for
*/
-template<class Key, class Val, class Cmp>void MTasker<Key,Val,Cmp>::getEvents(std::vector<Key>& events)
+template <class Key, class Val, class Cmp>
+void MTasker<Key, Val, Cmp>::getEvents(std::vector<Key>& events)
{
events.clear();
- for(typename waiters_t::const_iterator i=d_waiters.begin();i!=d_waiters.end();++i) {
+ for (typename waiters_t::const_iterator i = d_waiters.begin(); i != d_waiters.end(); ++i) {
events.push_back(i->first);
}
}
/** Processes can call this to get a numerical representation of their current thread ID.
This can be useful for logging purposes.
*/
-template<class Key, class Val, class Cmp>int MTasker<Key,Val,Cmp>::getTid() const
+template <class Key, class Val, class Cmp>
+int MTasker<Key, Val, Cmp>::getTid() const
{
return d_tid;
}
//! Returns the maximum stack usage so far of this MThread
-template<class Key, class Val, class Cmp>uint64_t MTasker<Key,Val,Cmp>::getMaxStackUsage()
+template <class Key, class Val, class Cmp>
+uint64_t MTasker<Key, Val, Cmp>::getMaxStackUsage()
{
return d_threads[d_tid].startOfStack - d_threads[d_tid].highestStackSeen;
}
//! Returns the maximum stack usage so far of this MThread
-template<class Key, class Val, class Cmp>unsigned int MTasker<Key,Val,Cmp>::getUsec()
+template <class Key, class Val, class Cmp>
+unsigned int MTasker<Key, Val, Cmp>::getUsec()
{
#ifdef MTASKERTIMING
- return d_threads[d_tid].totTime + d_threads[d_tid].dt.ndiff()/1000;
-#else
+ return d_threads[d_tid].totTime + d_threads[d_tid].dt.ndiff() / 1000;
+#else
return 0;
#endif
}
// #define MTASKERTIMING 1
-//! The main MTasker class
+//! The main MTasker class
/** The main MTasker class. See the main page for more information.
\tparam EventKey Type of the key with which events are to be identified. Defaults to int.
\tparam EventVal Type of the content or value of an event. Defaults to int. Cannot be set to void.
\note The EventKey needs to have an operator< defined because it is used as the key of an associative array
*/
-template<class EventKey=int, class EventVal=int, class Cmp = std::less<EventKey>> class MTasker
+template <class EventKey = int, class EventVal = int, class Cmp = std::less<EventKey>>
+class MTasker
{
private:
pdns_ucontext_t d_kernel;
struct ThreadInfo
{
- std::shared_ptr<pdns_ucontext_t> context;
- std::function<void(void)> start;
- const char* startOfStack;
- const char* highestStackSeen;
+ std::shared_ptr<pdns_ucontext_t> context;
+ std::function<void(void)> start;
+ const char* startOfStack;
+ const char* highestStackSeen;
#ifdef MTASKERTIMING
- CPUTime dt;
- unsigned int totTime;
+ CPUTime dt;
+ unsigned int totTime;
#endif
};
int d_maxtid{0};
EventVal d_waitval;
- enum waitstatusenum : int8_t {Error=-1,TimeOut=0,Answer} d_waitstatus;
+ enum waitstatusenum : int8_t
+ {
+ Error = -1,
+ TimeOut = 0,
+ Answer
+ } d_waitstatus;
public:
struct Waiter
struct timeval ttd;
int tid;
};
- struct KeyTag {};
+ struct KeyTag
+ {
+ };
typedef multi_index_container<
Waiter,
- indexed_by <
- ordered_unique<member<Waiter,EventKey,&Waiter::key>, Cmp>,
- ordered_non_unique<tag<KeyTag>, member<Waiter,struct timeval,&Waiter::ttd> >
- >
- > waiters_t;
+ indexed_by<
+ ordered_unique<member<Waiter, EventKey, &Waiter::key>, Cmp>,
+ ordered_non_unique<tag<KeyTag>, member<Waiter, struct timeval, &Waiter::ttd>>>>
+ waiters_t;
waiters_t d_waiters;
}
//! Constructor
- /** Constructor with a small default stacksize. If any of your threads exceeds this stack, your application will crash.
+ /** Constructor with a small default stacksize. If any of your threads exceeds this stack, your application will crash.
This limit applies solely to the stack, the heap is not limited in any way. If threads need to allocate a lot of data,
- the use of new/delete is suggested.
+ the use of new/delete is suggested.
*/
- MTasker(size_t stacksize=16*8192, size_t stackCacheSize=0) : d_stacksize(stacksize), d_maxCachedStacks(stackCacheSize), d_waitstatus(Error)
+ MTasker(size_t stacksize = 16 * 8192, size_t stackCacheSize = 0) :
+ d_stacksize(stacksize), d_maxCachedStacks(stackCacheSize), d_waitstatus(Error)
{
initMainStackBounds();
d_stacksize = d_stacksize >> 4 << 4;
}
- typedef void tfunc_t(void *); //!< type of the pointer that starts a thread
- int waitEvent(EventKey &key, EventVal *val=nullptr, unsigned int timeoutMsec=0, const struct timeval* now=nullptr);
+ typedef void tfunc_t(void*); //!< type of the pointer that starts a thread
+ int waitEvent(EventKey& key, EventVal* val = nullptr, unsigned int timeoutMsec = 0, const struct timeval* now = nullptr);
void yield();
- int sendEvent(const EventKey& key, const EventVal* val=nullptr);
+ int sendEvent(const EventKey& key, const EventVal* val = nullptr);
void getEvents(std::vector<EventKey>& events);
- void makeThread(tfunc_t *start, void* val);
- bool schedule(const struct timeval* now=nullptr);
+ void makeThread(tfunc_t* start, void* val);
+ bool schedule(const struct timeval* now = nullptr);
bool noProcesses() const;
unsigned int numProcesses() const;
int getTid() const;
private:
std::shared_ptr<pdns_ucontext_t> getUContext();
- EventKey d_eventkey; // for waitEvent, contains exact key it was awoken for
+ EventKey d_eventkey; // for waitEvent, contains exact key it was awoken for
};
#include "mtasker.cc"