From: Bert Hubert Date: Thu, 14 Jul 2005 17:40:18 +0000 (+0000) Subject: add titles to mthreads X-Git-Tag: pdns-2.9.18~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9170fbaf7ea29d888c345eea4d7a52aaa9145df0;p=thirdparty%2Fpdns.git add titles to mthreads fix confusion in mtasker when waiting for duplicated keys clarified error handling in waiting for events improve error checking in mtasker fix dnsreplay timing fix git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@442 d19b8d6e-7fed-0310-83ef-9ca221ded41b --- diff --git a/pdns/dnsreplay.cc b/pdns/dnsreplay.cc index 66b0482fe7..189ed690da 100644 --- a/pdns/dnsreplay.cc +++ b/pdns/dnsreplay.cc @@ -302,8 +302,9 @@ try tosleep.tv_nsec=nanoseconds%1000000000UL; nanosleep(&tosleep, 0); + lastsent=pr.d_pheader.ts; } - lastsent=pr.d_pheader.ts; + // cout<<"sending!"<sendTo(string(pr.d_payload, pr.d_payload + pr.d_len), remote); } diff --git a/pdns/mtasker.cc b/pdns/mtasker.cc index 940341fe90..370bf94452 100644 --- a/pdns/mtasker.cc +++ b/pdns/mtasker.cc @@ -166,9 +166,15 @@ templateint MTasker::waitEven w.ttd= timeout ? time(0)+timeout : 0; w.tid=d_tid; + if(d_waiters.count(key)) { // there was already an exact same waiter + return -1; + } d_waiters[key]=w; - swapcontext(d_waiters[key].context,&d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first + if(swapcontext(d_waiters[key].context,&d_kernel)) { // 'A' will return here when 'key' has arrived, hands over control to kernel first + perror("swapcontext"); + exit(EXIT_FAILURE); // no way we can deal with this + } if(val && d_waitstatus==Answer) *val=d_waitval; d_tid=w.tid; @@ -181,7 +187,10 @@ templateint MTasker::waitEven templatevoid MTasker::yield() { d_runQueue.push(d_tid); - swapcontext(d_threads[d_tid],&d_kernel); // give control to the kernel + if(swapcontext(d_threads[d_tid].first ,&d_kernel) < 0) { // give control to the kernel + perror("swapcontext in yield"); + exit(EXIT_FAILURE); + } } //! reports that an event took place for which threads may be waiting @@ -193,6 +202,7 @@ templatevoid MTasker::yield() templateint MTasker::sendEvent(const EventKey& key, const EventVal* val) { if(!d_waiters.count(key)) { + // cout<<"Event sent nobody was waiting for!"<int MTasker::sendEven d_tid=d_waiters[key].tid; // set tid d_waiters.erase(key); // removes the waitpoint - swapcontext(&d_kernel,userspace); // swaps back to the above point 'A' - + if(swapcontext(&d_kernel,userspace)) { // swaps back to the above point 'A' + perror("swapcontext in sendEvent"); + exit(EXIT_FAILURE); + } delete userspace; return 1; } @@ -215,7 +227,7 @@ templateint MTasker::sendEven \param start Pointer to the function which will form the start of the thread \param val A void pointer that can be used to pass data to the thread */ -templatevoid MTasker::makeThread(tfunc_t *start, void* val) +templatevoid MTasker::makeThread(tfunc_t *start, void* val, const string& name) { ucontext_t *uc=new ucontext_t; getcontext(uc); @@ -229,7 +241,7 @@ templatevoid MTasker::makeThread(tfunc_t *start, #else makecontext (uc, (void (*)(void))threadWrapper, 4, this, start, d_maxtid, val); #endif - d_threads[d_maxtid]=uc; + d_threads[d_maxtid]=make_pair(uc, name); d_runQueue.push(d_maxtid++); // will run at next schedule invocation } @@ -249,13 +261,17 @@ templatebool MTasker::schedule() if(!d_runQueue.empty()) { d_tid=d_runQueue.front(); - swapcontext(&d_kernel, d_threads[d_tid]); + if(swapcontext(&d_kernel, d_threads[d_tid].first)) { + perror("swapcontext in schedule"); + exit(EXIT_FAILURE); + } + d_runQueue.pop(); return true; } if(!d_zombiesQueue.empty()) { - delete[] (char *)d_threads[d_zombiesQueue.front()]->uc_stack.ss_sp; - delete d_threads[d_zombiesQueue.front()]; + delete[] (char *)d_threads[d_zombiesQueue.front()].first->uc_stack.ss_sp; + delete d_threads[d_zombiesQueue.front()].first; d_threads.erase(d_zombiesQueue.front()); d_zombiesQueue.pop(); return true; @@ -265,7 +281,10 @@ templatebool MTasker::schedule() for(typename waiters_t::const_iterator i=d_waiters.begin();i!=d_waiters.end();++i) { if(i->second.ttd && i->second.ttdsecond.context); // swaps back to the above point 'A' + if(swapcontext(&d_kernel,i->second.context)) { // swaps back to the above point 'A' + perror("swapcontext in schedule2"); + exit(EXIT_FAILURE); + } delete i->second.context; d_waiters.erase(i->first); // removes the waitpoint } diff --git a/pdns/mtasker.hh b/pdns/mtasker.hh index 3f3c1be14a..643f827257 100644 --- a/pdns/mtasker.hh +++ b/pdns/mtasker.hh @@ -52,7 +52,8 @@ private: typedef std::map waiters_t; waiters_t d_waiters; - std::map d_threads; + typedef std::map > mthreads_t; + mthreads_t d_threads; int d_tid; int d_maxtid; size_t d_stacksize; @@ -76,11 +77,16 @@ public: void yield(); int sendEvent(const EventKey& key, const EventVal* val=0); void getEvents(std::vector& events); - void makeThread(tfunc_t *start, void* val); + void makeThread(tfunc_t *start, void* val, const string& name=""); bool schedule(); bool noProcesses(); unsigned int numProcesses(); int getTid(); + void setTitle(const string& name) + { + d_threads[d_tid].second=name; + } + private: static void threadWrapper(MTasker *self, tfunc_t *tf, int tid, void* val); }; diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index eb1fa71bd5..5fa60954f0 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -122,14 +122,14 @@ int asendtcp(const string& data, Socket* sock) // cerr<<"asendtcp called for "<getHandle()]=pident; - if(!MT->waitEvent(pident,&packet,1)) { // timeout + int ret=MT->waitEvent(pident,&packet,1); + if(!ret || ret==-1) { // timeout d_tcpclientwritesocks.erase(sock->getHandle()); - return 0; } - // cerr<<"asendtcp happy"<getHandle()]=pident; - if(!MT->waitEvent(pident,&data,1)) { // timeout + int ret=MT->waitEvent(pident,&data,1); + if(!ret || ret==-1) { // timeout d_tcpclientreadsocks.erase(sock->getHandle()); - return 0; } - - // cerr<<"arecvtcp happy, data.size(): "< 1 is success int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) { return sendto(d_clientsock, data, len, flags, toaddr, addrlen); } +// -1 is error, 0 is timeout, 1 is success int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id) { PacketID pident; @@ -164,14 +164,13 @@ int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility:: memcpy(&pident.remote,toaddr,sizeof(pident.remote)); string packet; - if(!MT->waitEvent(pident,&packet,1)) { // timeout - return 0; + int ret=MT->waitEvent(pident,&packet,1); + if(ret > 0) { + *d_len=packet.size(); + memcpy(data,packet.c_str(),min(len,*d_len)); } - *d_len=packet.size(); - memcpy(data,packet.c_str(),min(len,*d_len)); - - return 1; + return ret; } @@ -220,12 +219,12 @@ void startDoResolve(void *p) DNSPacket P=*(DNSPacket *)p; delete (DNSPacket *)p; - + vectorret; DNSPacket *R=P.replyPacket(); R->setA(false); R->setRA(true); - + // MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName()); SyncRes sr; if(!quiet) L<getTid()<<"] " << (R->d_tcp ? "TCP " : "") << "question for '"<setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName()); if(!quiet) { L<getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<d.ancount)<<" answers, "<d.arcount)<<" additional, took "<schedule()); // housekeeping, let threads do their thing if(!((counter++)%100)) - MT->makeThread(houseKeeping,0); + MT->makeThread(houseKeeping,0,"housekeeping"); if(statsWanted) doStats(); @@ -689,7 +696,7 @@ int main(int argc, char **argv) ++qcounter; P.setSocket(*i); P.d_tcp=false; - MT->makeThread(startDoResolve,(void*)new DNSPacket(P)); + MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "udp"); } } } @@ -737,7 +744,8 @@ int main(int argc, char **argv) } } else { - cerr<<"when reading ret="<makeThread(startDoResolve,(void*)new DNSPacket(P)); + MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "tcp"); } } }