]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
add titles to mthreads
authorBert Hubert <bert.hubert@netherlabs.nl>
Thu, 14 Jul 2005 17:40:18 +0000 (17:40 +0000)
committerBert Hubert <bert.hubert@netherlabs.nl>
Thu, 14 Jul 2005 17:40:18 +0000 (17:40 +0000)
fix confusion in mtasker when waiting for duplicated keys
clarified error handling in waiting for events
improve error checking in mtasker
fix dnsreplay timing fix

git-svn-id: svn://svn.powerdns.com/pdns/trunk/pdns@442 d19b8d6e-7fed-0310-83ef-9ca221ded41b

pdns/dnsreplay.cc
pdns/mtasker.cc
pdns/mtasker.hh
pdns/pdns_recursor.cc

index 66b0482fe7e9c9c468792256e2452d703fcae8e1..189ed690dadd4f7d379cdd8ec6c7b8f5124640ec 100644 (file)
@@ -302,8 +302,9 @@ try
          tosleep.tv_nsec=nanoseconds%1000000000UL;
          
          nanosleep(&tosleep, 0);
+         lastsent=pr.d_pheader.ts;
        }
-       lastsent=pr.d_pheader.ts;
+
        //      cout<<"sending!"<<endl;
        s_socket->sendTo(string(pr.d_payload, pr.d_payload + pr.d_len), remote);
       }
index 940341fe90e957a09371b5ddfb08faf9fa72143f..370bf94452031fad7a4987a0864cc46d670870f1 100644 (file)
@@ -166,9 +166,15 @@ template<class EventKey, class EventVal>int MTasker<EventKey,EventVal>::waitEven
   w.ttd= timeout ? time(0)+timeout : 0;
   w.tid=d_tid;
   
+  if(d_waiters.count(key)) { // there was already an exact same waiter
+    return -1;
+  }
   d_waiters[key]=w;
   
-  swapcontext(d_waiters[key].context,&d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first
+  if(swapcontext(d_waiters[key].context,&d_kernel)) { // 'A' will return here when 'key' has arrived, hands over control to kernel first
+    perror("swapcontext");
+    exit(EXIT_FAILURE); // no way we can deal with this 
+  }
   if(val && d_waitstatus==Answer) 
     *val=d_waitval;
   d_tid=w.tid;
@@ -181,7 +187,10 @@ template<class EventKey, class EventVal>int MTasker<EventKey,EventVal>::waitEven
 template<class Key, class Val>void MTasker<Key,Val>::yield()
 {
   d_runQueue.push(d_tid);
-  swapcontext(d_threads[d_tid],&d_kernel); // give control to the kernel
+  if(swapcontext(d_threads[d_tid].first ,&d_kernel) < 0) { // give control to the kernel
+    perror("swapcontext in  yield");
+    exit(EXIT_FAILURE);
+  }
 }
 
 //! reports that an event took place for which threads may be waiting
@@ -193,6 +202,7 @@ template<class Key, class Val>void MTasker<Key,Val>::yield()
 template<class EventKey, class EventVal>int MTasker<EventKey,EventVal>::sendEvent(const EventKey& key, const EventVal* val)
 {
   if(!d_waiters.count(key)) {
+    //    cout<<"Event sent nobody was waiting for!"<<endl;
     return 0;
   }
   
@@ -204,8 +214,10 @@ template<class EventKey, class EventVal>int MTasker<EventKey,EventVal>::sendEven
   d_tid=d_waiters[key].tid;         // set tid 
   
   d_waiters.erase(key);             // removes the waitpoint 
-  swapcontext(&d_kernel,userspace); // swaps back to the above point 'A'
-  
+  if(swapcontext(&d_kernel,userspace)) { // swaps back to the above point 'A'
+    perror("swapcontext in sendEvent");
+    exit(EXIT_FAILURE);
+  }
   delete userspace;
   return 1;
 }
@@ -215,7 +227,7 @@ template<class EventKey, class EventVal>int MTasker<EventKey,EventVal>::sendEven
     \param start Pointer to the function which will form the start of the thread
     \param val A void pointer that can be used to pass data to the thread
 */
-template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val)
+template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val, const string& name)
 {
   ucontext_t *uc=new ucontext_t;
   getcontext(uc);
@@ -229,7 +241,7 @@ template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start,
 #else
   makecontext (uc, (void (*)(void))threadWrapper, 4, this, start, d_maxtid, val);
 #endif
-  d_threads[d_maxtid]=uc;
+  d_threads[d_maxtid]=make_pair(uc, name);
   d_runQueue.push(d_maxtid++); // will run at next schedule invocation
 }
 
@@ -249,13 +261,17 @@ template<class Key, class Val>bool MTasker<Key,Val>::schedule()
 
   if(!d_runQueue.empty()) {
     d_tid=d_runQueue.front();
-    swapcontext(&d_kernel, d_threads[d_tid]);
+    if(swapcontext(&d_kernel, d_threads[d_tid].first)) {
+      perror("swapcontext in schedule");
+      exit(EXIT_FAILURE);
+    }
+      
     d_runQueue.pop();
     return true;
   }
   if(!d_zombiesQueue.empty()) {
-    delete[] (char *)d_threads[d_zombiesQueue.front()]->uc_stack.ss_sp;
-    delete d_threads[d_zombiesQueue.front()];
+    delete[] (char *)d_threads[d_zombiesQueue.front()].first->uc_stack.ss_sp;
+    delete d_threads[d_zombiesQueue.front()].first;
     d_threads.erase(d_zombiesQueue.front());
     d_zombiesQueue.pop();
     return true;
@@ -265,7 +281,10 @@ template<class Key, class Val>bool MTasker<Key,Val>::schedule()
     for(typename waiters_t::const_iterator i=d_waiters.begin();i!=d_waiters.end();++i) {
       if(i->second.ttd && i->second.ttd<now) {
        d_waitstatus=TimeOut;
-       swapcontext(&d_kernel,i->second.context); // swaps back to the above point 'A'
+       if(swapcontext(&d_kernel,i->second.context)) { // swaps back to the above point 'A'
+         perror("swapcontext in schedule2");
+         exit(EXIT_FAILURE);
+       }
        delete i->second.context;              
        d_waiters.erase(i->first);                  // removes the waitpoint 
       }
index 3f3c1be14ac3113a7b5f17d280c5b0d87c5f1b49..643f82725790752f353021dab4c41d1d1e2a34ed 100644 (file)
@@ -52,7 +52,8 @@ private:
 
   typedef std::map<EventKey,Waiter> waiters_t;
   waiters_t d_waiters;
-  std::map<int,ucontext_t*> d_threads;
+  typedef std::map<int,pair<ucontext_t*,string> > mthreads_t;
+  mthreads_t d_threads;
   int d_tid;
   int d_maxtid;
   size_t d_stacksize;
@@ -76,11 +77,16 @@ public:
   void yield();
   int sendEvent(const EventKey& key, const EventVal* val=0);
   void getEvents(std::vector<EventKey>& events);
-  void makeThread(tfunc_t *start, void* val);
+  void makeThread(tfunc_t *start, void* val, const string& name="");
   bool schedule();
   bool noProcesses();
   unsigned int numProcesses();
   int getTid(); 
+  void setTitle(const string& name)
+  {
+    d_threads[d_tid].second=name;
+  }
+
 private:
   static void threadWrapper(MTasker *self, tfunc_t *tf, int tid, void* val);
 };
index eb1fa71bd5379f49368699be7c681eef3eca4199..5fa60954f03a1ee22d739f4cd8b96f1dd53b039f 100644 (file)
@@ -122,14 +122,14 @@ int asendtcp(const string& data, Socket* sock)
   //  cerr<<"asendtcp called for "<<data.size()<<" bytes"<<endl;
   d_tcpclientwritesocks[sock->getHandle()]=pident;
 
-  if(!MT->waitEvent(pident,&packet,1)) { // timeout
+  int ret=MT->waitEvent(pident,&packet,1);
+  if(!ret || ret==-1) { // timeout
     d_tcpclientwritesocks.erase(sock->getHandle());
-    return 0; 
   }
-  //  cerr<<"asendtcp happy"<<endl;
-  return 1;
+  return ret;
 }
 
+// -1 is error, 0 is timeout, 1 is success
 int arecvtcp(string& data, int len, Socket* sock) 
 {
   data="";
@@ -141,22 +141,22 @@ int arecvtcp(string& data, int len, Socket* sock)
   // cerr<<d_tcpclientwritesocks.size()<<" write sockets"<<endl;
   d_tcpclientreadsocks[sock->getHandle()]=pident;
 
-  if(!MT->waitEvent(pident,&data,1)) { // timeout
+  int ret=MT->waitEvent(pident,&data,1);
+  if(!ret || ret==-1) { // timeout
     d_tcpclientreadsocks.erase(sock->getHandle());
-    return 0; 
   }
-
-  // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl;
-  return 1;
+  return ret;
 }
 
 
 /* these two functions are used by LWRes */
+// -1 is error, > 1 is success
 int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id) 
 {
   return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
 }
 
+// -1 is error, 0 is timeout, 1 is success
 int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
 {
   PacketID pident;
@@ -164,14 +164,13 @@ int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::
   memcpy(&pident.remote,toaddr,sizeof(pident.remote));
 
   string packet;
-  if(!MT->waitEvent(pident,&packet,1)) { // timeout
-    return 0; 
+  int ret=MT->waitEvent(pident,&packet,1);
+  if(ret > 0) {
+    *d_len=packet.size();
+    memcpy(data,packet.c_str(),min(len,*d_len));
   }
 
-  *d_len=packet.size();
-  memcpy(data,packet.c_str(),min(len,*d_len));
-
-  return 1;
+  return ret;
 }
 
 
@@ -220,12 +219,12 @@ void startDoResolve(void *p)
     DNSPacket P=*(DNSPacket *)p;
 
     delete (DNSPacket *)p;
-
+    
     vector<DNSResourceRecord>ret;
     DNSPacket *R=P.replyPacket();
     R->setA(false);
     R->setRA(true);
-
+    //    MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName());
     SyncRes sr;
     if(!quiet)
       L<<Logger::Error<<"["<<MT->getTid()<<"] " << (R->d_tcp ? "TCP " : "") << "question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
@@ -273,6 +272,7 @@ void startDoResolve(void *p)
       //  XXX FIXME write this writev fallback otherwise
     }
 
+    //    MT->setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName());
     if(!quiet) {
       L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
       L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
@@ -419,6 +419,12 @@ void usr1Handler(int)
   statsWanted=true;
 }
 
+void usr2Handler(int)
+{
+  SyncRes::setLog(true);
+}
+
+
 
 void doStats(void)
 {
@@ -557,6 +563,7 @@ int main(int argc, char **argv)
       daemonize();
     }
     signal(SIGUSR1,usr1Handler);
+    signal(SIGUSR2,usr2Handler);
 
     writePid();
 #endif
@@ -586,7 +593,7 @@ int main(int argc, char **argv)
       while(MT->schedule()); // housekeeping, let threads do their thing
       
       if(!((counter++)%100)) 
-       MT->makeThread(houseKeeping,0);
+       MT->makeThread(houseKeeping,0,"housekeeping");
       if(statsWanted)
        doStats();
 
@@ -689,7 +696,7 @@ int main(int argc, char **argv)
              ++qcounter;
              P.setSocket(*i);
              P.d_tcp=false;
-             MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
+             MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "udp");
            }
          }
        }
@@ -737,7 +744,8 @@ int main(int argc, char **argv)
            }
          }
          else {
-           cerr<<"when reading ret="<<ret<<endl;
+           //      cerr<<"when reading ret="<<ret<<endl;
+           // XXX FIXME I think some stuff needs to happen here - like send an EOF event
          }
        }
        if(!haveErased)
@@ -762,7 +770,8 @@ int main(int argc, char **argv)
 
          }
          else { 
-           cerr<<"ret="<<ret<<" when writing"<<endl;
+           //      cerr<<"ret="<<ret<<" when writing"<<endl;
+           // XXX FIXME I think some stuff needs to happen here - like send an EOF event
          }
        }
        if(!haveErased)
@@ -828,7 +837,7 @@ int main(int argc, char **argv)
                  L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
                else {
                  ++qcounter;
-                 MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
+                 MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "tcp");
                }
              }
            }