/** the Distributor template class enables you to multithread slow question/answer
processes.
- Questions are posed to the Distributor, which can either hand back the answer,
- or give it directly to a callback. Only the latter mode of operation is used in
- PowerDNS.
+ Questions are posed to the Distributor, which returns the answer via a callback.
The Distributor takes care that there are enough Backends alive at any one
time and will try to spawn additional ones should they die.
If an exception escapes a Backend, the distributor retires it.
*/
+template<class Answer>struct AnswerData
+{
+ Answer *A;
+ time_t created;
+};
+
template<class Answer, class Question, class Backend> class Distributor
{
public:
- Distributor(int n=10); //!< Create a new Distributor with \param n threads
- struct AnswerData
+ static Distributor *Create(int n=1); //!< Create a new Distributor with \param n threads
+
+ virtual void cleanup();
+ virtual int question(Question *, void (*)(const AnswerData<Answer> &)) {return 0;}; //!< Submit a question to the Distributor
+ virtual void getQueueSizes(int &questions, int &answers) {}; //!< Returns length of question queue
+
+ virtual int getNumBusy() {return 0;};
+
+ virtual bool isOverloaded() {return false;};
+
+private:
+};
+
+template<class Answer, class Question, class Backend> class SingleThreadDistributor
+ : public Distributor<Answer, Question, Backend>
+{
+public:
+ SingleThreadDistributor();
+ int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
+ void getQueueSizes(int &questions, int &answers) {
+ questions = 0;
+ answers = 0;
+ }
+
+ int getNumBusy()
+ {
+ return 0;
+ }
+
+ bool isOverloaded()
{
- Answer *A;
- time_t created;
- };
- int question(Question *, void (*)(const AnswerData &)=0); //!< Submit a question to the Distributor
- Answer *answer(void); //!< Wait for any answer from the Distributor
- Answer *wait(Question *); //!< wait for an answer to a specific question
- int timeoutWait(int id, Answer *, int); //!< wait for a specific answer, with timeout
+ return false;
+ }
+
+private:
+ Backend *b;
+};
+
+template<class Answer, class Question, class Backend> class MultiThreadDistributor
+ : public Distributor<Answer, Question, Backend>
+{
+public:
+ MultiThreadDistributor(int n=1);
+ int question(Question *, void (*)(const AnswerData<Answer> &)); //!< Submit a question to the Distributor
static void* makeThread(void *); //!< helper function to create our n threads
- void getQueueSizes(int &questions, int &answers); //!< Returns length of question queue
+ void getQueueSizes(int &questions, int &answers) {
+ numquestions.getValue( &questions );
+ answers = 0;
+ }
int getNumBusy()
{
struct QuestionData
{
Question *Q;
- void (*callback)(const AnswerData &);
+ void (*callback)(const AnswerData<Answer> &);
int id;
};
- typedef pair<QuestionData, AnswerData> tuple_t;
bool isOverloaded()
{
return d_overloaded;
bool d_overloaded;
std::queue<QuestionData> questions;
pthread_mutex_t q_lock;
-
- deque<tuple_t> answers;
- pthread_mutex_t a_lock;
Semaphore numquestions;
- Semaphore numanswers;
pthread_mutex_t to_mut;
pthread_cond_t to_cond;
time_t d_last_started;
int d_num_threads;
AtomicCounter d_idle_threads;
- Backend *b;
};
-
//template<class Answer, class Question, class Backend>::nextid;
+template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>* Distributor<Answer,Question,Backend>::Create(int n)
+{
+ if( n == 1 )
+ return new SingleThreadDistributor<Answer,Question,Backend>();
+ else
+ return new MultiThreadDistributor<Answer,Question,Backend>( n );
+}
+
+template<class Answer, class Question, class Backend>SingleThreadDistributor<Answer,Question,Backend>::SingleThreadDistributor()
+{
+ L<<Logger::Error<<"Only asked for 1 backend thread - operating unthreaded"<<endl;
+ b=new Backend;
+}
-template<class Answer, class Question, class Backend>Distributor<Answer,Question,Backend>::Distributor(int n)
+template<class Answer, class Question, class Backend>MultiThreadDistributor<Answer,Question,Backend>::MultiThreadDistributor(int n)
{
- b=0;
+ d_num_threads=n;
d_overloaded = false;
+
nextid=0;
// d_idle_threads=0;
d_last_started=time(0);
// sem_init(&numquestions,0,0);
pthread_mutex_init(&q_lock,0);
-// sem_init(&numanswers,0,0);
- pthread_mutex_init(&a_lock,0);
-
pthread_mutex_init(&to_mut,0);
pthread_cond_init(&to_cond,0);
pthread_t tid;
- d_num_threads=n;
-
L<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
-
for(int i=0;i<n;i++) {
pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
Utility::usleep(50000); // we've overloaded mysql in the past :-)
}
-
L<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
}
+template<class Answer, class Question, class Backend>void Distributor<Answer,Question,Backend>::cleanup()
+{
+ L<<Logger::Error<< "Cleaning up distributor" <<endl;
+}
+
// start of a new thread
-template<class Answer, class Question, class Backend>void *Distributor<Answer,Question,Backend>::makeThread(void *p)
+template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
{
pthread_detach(pthread_self());
try {
Backend *b=new Backend(); // this will answer our questions
- Distributor *us=static_cast<Distributor *>(p);
+ MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
int qcount;
// this is so gross
return 0;
}
- AnswerData AD;
+ AnswerData<Answer> AD;
AD.A=a;
AD.created=time(0);
- tuple_t tuple(QD,AD);
- if(QD.callback) {
- QD.callback(AD);
- }
- else {
- pthread_mutex_lock(&us->a_lock);
-
- us->answers.push_back(tuple);
- pthread_mutex_unlock(&us->a_lock);
-
- // L<<"We have an answer to send! Trying to get to to_mut lock"<<endl;
- pthread_mutex_lock(&us->to_mut);
- // L<<"Yes, we got the lock, we can transmit! First we post"<<endl;
- us->numanswers.post();
- // L<<"And now we broadcast!"<<endl;
- pthread_cond_broadcast(&us->to_cond); // for timeoutWait();
- pthread_mutex_unlock(&us->to_mut);
- }
+ QD.callback(AD);
}
delete b;
return 0;
}
-template<class Answer, class Question, class Backend>int Distributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData &))
+template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
{
- if(d_num_threads==1 && callback) { // short circuit
- if(!b) {
- L<<Logger::Error<<"Engaging bypass - now operating unthreaded"<<endl;
- b=new Backend;
- }
- Answer *a;
-
- try {
- a=b->question(q); // a can be NULL!
- }
- catch(const PDNSException &e) {
- L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
- delete b;
- b=0;
- return 0;
- }
- catch(...) {
- L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
- delete b;
- b=0;
- return 0;
- }
-
-
- AnswerData AD;
- AD.A=a;
- AD.created=time(0);
- callback(AD);
+ Answer *a;
+ try {
+ a=b->question(q); // a can be NULL!
+ }
+ catch(const AhuException &e) {
+ L<<Logger::Error<<"Backend error: "<<e.reason<<endl;
+ delete b;
+ b=new Backend;
return 0;
}
- else {
- q=new Question(*q);
+ catch(...) {
+ L<<Logger::Error<<Logger::NTLog<<"Caught unknown exception in Distributor thread "<<(unsigned long)pthread_self()<<endl;
+ delete b;
+ b=new Backend;
+ return 0;
}
+ AnswerData<Answer> AD;
+ AD.A=a;
+ AD.created=time(0);
+ callback(AD);
+ return 0;
+}
+
+template<class Answer, class Question, class Backend>int MultiThreadDistributor<Answer,Question,Backend>::question(Question* q, void (*callback)(const AnswerData<Answer> &))
+{
+ // XXX assert callback
+ q=new Question(*q);
DLOG(L<<"Distributor has "<<Backend::numRunning()<<" threads available"<<endl);
return QD.id;
}
-template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::answer()
-{
- numanswers.wait();
- tuple_t tuple;
-
- pthread_mutex_lock(&a_lock);
- tuple=answers.front();
- answers.pop_front();
- pthread_mutex_unlock(&a_lock);
- return tuple.second.A;
-}
-
-//! Wait synchronously for the answer of the question just asked. For this to work, no answer() functions must be called
-template<class Answer, class Question,class Backend>Answer* Distributor<Answer,Question,Backend>::wait(Question *q)
-{
- for(;;)
- {
- numanswers.wait();
- pthread_mutex_lock(&a_lock);
-
- // search if the answer is there
- tuple_t tuple=answers.front();
- if(tuple.first==q)
- {
- answers.pop_front();
- pthread_mutex_unlock(&a_lock);
- return tuple.second.A;
- }
- // if not, loop again
- pthread_mutex_unlock(&a_lock);
- numanswers.post();
- }
- // FIXME: write this
-}
-
-template<class Answer, class Question,class Backend>void Distributor<Answer,Question,Backend>::getQueueSizes(int &questions, int &answers)
-{
- numquestions.getValue( &questions );
- numanswers.getValue( &answers );
-}
-
#endif // DISTRIBUTOR_HH