tosleep.tv_nsec=nanoseconds%1000000000UL;
nanosleep(&tosleep, 0);
+ lastsent=pr.d_pheader.ts;
}
- lastsent=pr.d_pheader.ts;
+
// cout<<"sending!"<<endl;
s_socket->sendTo(string(pr.d_payload, pr.d_payload + pr.d_len), remote);
}
w.ttd= timeout ? time(0)+timeout : 0;
w.tid=d_tid;
+ if(d_waiters.count(key)) { // there was already an exact same waiter
+ return -1;
+ }
d_waiters[key]=w;
- swapcontext(d_waiters[key].context,&d_kernel); // 'A' will return here when 'key' has arrived, hands over control to kernel first
+ if(swapcontext(d_waiters[key].context,&d_kernel)) { // 'A' will return here when 'key' has arrived, hands over control to kernel first
+ perror("swapcontext");
+ exit(EXIT_FAILURE); // no way we can deal with this
+ }
if(val && d_waitstatus==Answer)
*val=d_waitval;
d_tid=w.tid;
template<class Key, class Val>void MTasker<Key,Val>::yield()
{
d_runQueue.push(d_tid);
- swapcontext(d_threads[d_tid],&d_kernel); // give control to the kernel
+ if(swapcontext(d_threads[d_tid].first ,&d_kernel) < 0) { // give control to the kernel
+ perror("swapcontext in yield");
+ exit(EXIT_FAILURE);
+ }
}
//! reports that an event took place for which threads may be waiting
template<class EventKey, class EventVal>int MTasker<EventKey,EventVal>::sendEvent(const EventKey& key, const EventVal* val)
{
if(!d_waiters.count(key)) {
+ // cout<<"Event sent nobody was waiting for!"<<endl;
return 0;
}
d_tid=d_waiters[key].tid; // set tid
d_waiters.erase(key); // removes the waitpoint
- swapcontext(&d_kernel,userspace); // swaps back to the above point 'A'
-
+ if(swapcontext(&d_kernel,userspace)) { // swaps back to the above point 'A'
+ perror("swapcontext in sendEvent");
+ exit(EXIT_FAILURE);
+ }
delete userspace;
return 1;
}
\param start Pointer to the function which will form the start of the thread
\param val A void pointer that can be used to pass data to the thread
*/
-template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val)
+template<class Key, class Val>void MTasker<Key,Val>::makeThread(tfunc_t *start, void* val, const string& name)
{
ucontext_t *uc=new ucontext_t;
getcontext(uc);
#else
makecontext (uc, (void (*)(void))threadWrapper, 4, this, start, d_maxtid, val);
#endif
- d_threads[d_maxtid]=uc;
+ d_threads[d_maxtid]=make_pair(uc, name);
d_runQueue.push(d_maxtid++); // will run at next schedule invocation
}
if(!d_runQueue.empty()) {
d_tid=d_runQueue.front();
- swapcontext(&d_kernel, d_threads[d_tid]);
+ if(swapcontext(&d_kernel, d_threads[d_tid].first)) {
+ perror("swapcontext in schedule");
+ exit(EXIT_FAILURE);
+ }
+
d_runQueue.pop();
return true;
}
if(!d_zombiesQueue.empty()) {
- delete[] (char *)d_threads[d_zombiesQueue.front()]->uc_stack.ss_sp;
- delete d_threads[d_zombiesQueue.front()];
+ delete[] (char *)d_threads[d_zombiesQueue.front()].first->uc_stack.ss_sp;
+ delete d_threads[d_zombiesQueue.front()].first;
d_threads.erase(d_zombiesQueue.front());
d_zombiesQueue.pop();
return true;
for(typename waiters_t::const_iterator i=d_waiters.begin();i!=d_waiters.end();++i) {
if(i->second.ttd && i->second.ttd<now) {
d_waitstatus=TimeOut;
- swapcontext(&d_kernel,i->second.context); // swaps back to the above point 'A'
+ if(swapcontext(&d_kernel,i->second.context)) { // swaps back to the above point 'A'
+ perror("swapcontext in schedule2");
+ exit(EXIT_FAILURE);
+ }
delete i->second.context;
d_waiters.erase(i->first); // removes the waitpoint
}
typedef std::map<EventKey,Waiter> waiters_t;
waiters_t d_waiters;
- std::map<int,ucontext_t*> d_threads;
+ typedef std::map<int,pair<ucontext_t*,string> > mthreads_t;
+ mthreads_t d_threads;
int d_tid;
int d_maxtid;
size_t d_stacksize;
void yield();
int sendEvent(const EventKey& key, const EventVal* val=0);
void getEvents(std::vector<EventKey>& events);
- void makeThread(tfunc_t *start, void* val);
+ void makeThread(tfunc_t *start, void* val, const string& name="");
bool schedule();
bool noProcesses();
unsigned int numProcesses();
int getTid();
+ void setTitle(const string& name)
+ {
+ d_threads[d_tid].second=name;
+ }
+
private:
static void threadWrapper(MTasker *self, tfunc_t *tf, int tid, void* val);
};
// cerr<<"asendtcp called for "<<data.size()<<" bytes"<<endl;
d_tcpclientwritesocks[sock->getHandle()]=pident;
- if(!MT->waitEvent(pident,&packet,1)) { // timeout
+ int ret=MT->waitEvent(pident,&packet,1);
+ if(!ret || ret==-1) { // timeout
d_tcpclientwritesocks.erase(sock->getHandle());
- return 0;
}
- // cerr<<"asendtcp happy"<<endl;
- return 1;
+ return ret;
}
+// -1 is error, 0 is timeout, 1 is success
int arecvtcp(string& data, int len, Socket* sock)
{
data="";
// cerr<<d_tcpclientwritesocks.size()<<" write sockets"<<endl;
d_tcpclientreadsocks[sock->getHandle()]=pident;
- if(!MT->waitEvent(pident,&data,1)) { // timeout
+ int ret=MT->waitEvent(pident,&data,1);
+ if(!ret || ret==-1) { // timeout
d_tcpclientreadsocks.erase(sock->getHandle());
- return 0;
}
-
- // cerr<<"arecvtcp happy, data.size(): "<<data.size()<<endl;
- return 1;
+ return ret;
}
/* these two functions are used by LWRes */
+// -1 is error, > 1 is success
int asendto(const char *data, int len, int flags, struct sockaddr *toaddr, int addrlen, int id)
{
return sendto(d_clientsock, data, len, flags, toaddr, addrlen);
}
+// -1 is error, 0 is timeout, 1 is success
int arecvfrom(char *data, int len, int flags, struct sockaddr *toaddr, Utility::socklen_t *addrlen, int *d_len, int id)
{
PacketID pident;
memcpy(&pident.remote,toaddr,sizeof(pident.remote));
string packet;
- if(!MT->waitEvent(pident,&packet,1)) { // timeout
- return 0;
+ int ret=MT->waitEvent(pident,&packet,1);
+ if(ret > 0) {
+ *d_len=packet.size();
+ memcpy(data,packet.c_str(),min(len,*d_len));
}
- *d_len=packet.size();
- memcpy(data,packet.c_str(),min(len,*d_len));
-
- return 1;
+ return ret;
}
DNSPacket P=*(DNSPacket *)p;
delete (DNSPacket *)p;
-
+
vector<DNSResourceRecord>ret;
DNSPacket *R=P.replyPacket();
R->setA(false);
R->setRA(true);
-
+ // MT->setTitle("udp question for "+P.qdomain+"|"+P.qtype.getName());
SyncRes sr;
if(!quiet)
L<<Logger::Error<<"["<<MT->getTid()<<"] " << (R->d_tcp ? "TCP " : "") << "question for '"<<P.qdomain<<"|"<<P.qtype.getName()<<"' from "<<P.getRemote()<<endl;
// XXX FIXME write this writev fallback otherwise
}
+ // MT->setTitle("DONE! udp question for "+P.qdomain+"|"+P.qtype.getName());
if(!quiet) {
L<<Logger::Error<<"["<<MT->getTid()<<"] answer to "<<(P.d.rd?"":"non-rd ")<<"question '"<<P.qdomain<<"|"<<P.qtype.getName();
L<<"': "<<ntohs(R->d.ancount)<<" answers, "<<ntohs(R->d.arcount)<<" additional, took "<<sr.d_outqueries<<" packets, "<<
statsWanted=true;
}
+void usr2Handler(int)
+{
+ SyncRes::setLog(true);
+}
+
+
void doStats(void)
{
daemonize();
}
signal(SIGUSR1,usr1Handler);
+ signal(SIGUSR2,usr2Handler);
writePid();
#endif
while(MT->schedule()); // housekeeping, let threads do their thing
if(!((counter++)%100))
- MT->makeThread(houseKeeping,0);
+ MT->makeThread(houseKeeping,0,"housekeeping");
if(statsWanted)
doStats();
++qcounter;
P.setSocket(*i);
P.d_tcp=false;
- MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
+ MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "udp");
}
}
}
}
}
else {
- cerr<<"when reading ret="<<ret<<endl;
+ // cerr<<"when reading ret="<<ret<<endl;
+ // XXX FIXME I think some stuff needs to happen here - like send an EOF event
}
}
if(!haveErased)
}
else {
- cerr<<"ret="<<ret<<" when writing"<<endl;
+ // cerr<<"ret="<<ret<<" when writing"<<endl;
+ // XXX FIXME I think some stuff needs to happen here - like send an EOF event
}
}
if(!haveErased)
L<<Logger::Error<<"Ignoring answer on server socket!"<<endl;
else {
++qcounter;
- MT->makeThread(startDoResolve,(void*)new DNSPacket(P));
+ MT->makeThread(startDoResolve,(void*)new DNSPacket(P), "tcp");
}
}
}