#include <deque>
#include <queue>
#include <vector>
+#include <thread>
#include <pthread.h>
#include "threadname.hh"
#include <unistd.h>
MultiThreadDistributor(int n);
typedef std::function<void(std::unique_ptr<Answer>&)> callback_t;
int question(Question&, callback_t callback) override; //!< Submit a question to the Distributor
- static void* makeThread(void *); //!< helper function to create our n threads
+ void distribute(int n);
int getQueueSize() override {
return d_queued;
}
time_t d_last_started;
unsigned int d_overloadQueueLength, d_maxQueueLength;
int d_num_threads;
- std::atomic<unsigned int> d_queued{0}, d_running{0};
+ std::atomic<unsigned int> d_queued{0};
std::vector<std::pair<int,int>> d_pipes;
};
g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
_exit(1);
}
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
+ _exit(1);
+ }
catch(...) {
g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
_exit(1);
nextid=0;
d_last_started=time(0);
- pthread_t tid;
-
-
for(int i=0; i < n; ++i) {
int fds[2];
if(pipe(fds) < 0)
g_log<<Logger::Warning<<"About to create "<<n<<" backend threads for UDP"<<endl;
for(int i=0;i<n;i++) {
- pthread_create(&tid,0,&makeThread,static_cast<void *>(this));
+ std::thread t(std::bind(&MultiThreadDistributor<Answer,Question,Backend>::distribute, this, i));
+ t.detach();
Utility::usleep(50000); // we've overloaded mysql in the past :-)
}
g_log<<Logger::Warning<<"Done launching threads, ready to distribute questions"<<endl;
// start of a new thread
-template<class Answer, class Question, class Backend>void *MultiThreadDistributor<Answer,Question,Backend>::makeThread(void *p)
+template<class Answer, class Question, class Backend>void MultiThreadDistributor<Answer,Question,Backend>::distribute(int ournum)
{
setThreadName("pdns/distributo");
- pthread_detach(pthread_self());
- MultiThreadDistributor *us=static_cast<MultiThreadDistributor *>(p);
- int ournum=us->d_running++;
try {
std::unique_ptr<Backend> b= make_unique<Backend>(); // this will answer our questions
for(;;) {
QuestionData* tempQD = nullptr;
- if(read(us->d_pipes[ournum].first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
+ if(read(d_pipes.at(ournum).first, &tempQD, sizeof(tempQD)) != sizeof(tempQD))
unixDie("read");
- --us->d_queued;
+ --d_queued;
std::unique_ptr<QuestionData> QD = std::unique_ptr<QuestionData>(tempQD);
tempQD = nullptr;
std::unique_ptr<Answer> a = nullptr;
g_log<<Logger::Error<<"Distributor caught fatal exception: "<<AE.reason<<endl;
_exit(1);
}
+ catch(const std::exception& e) {
+ g_log<<Logger::Error<<"Distributor caught fatal exception: "<<e.what()<<endl;
+ _exit(1);
+ }
catch(...) {
g_log<<Logger::Error<<"Caught an unknown exception when creating backend, probably"<<endl;
_exit(1);
}
- return 0;
}
template<class Answer, class Question, class Backend>int SingleThreadDistributor<Answer,Question,Backend>::question(Question& q, callback_t callback)
QD->callback=callback;
++d_queued;
- if(write(d_pipes[QD->id % d_pipes.size()].second, &QD, sizeof(QD)) != sizeof(QD)) {
+ if(write(d_pipes.at(QD->id % d_pipes.size()).second, &QD, sizeof(QD)) != sizeof(QD)) {
--d_queued;
delete QD;
unixDie("write");