+static unsigned int getWorkerLoad(size_t workerIdx)
+{
+ const auto mt = s_threadInfos[/* skip handler */ 1 + g_numDistributorThreads + workerIdx].mt;
+ if (mt != nullptr) {
+ return mt->numProcesses();
+ }
+ return 0;
+}
+
+static unsigned int selectWorker(unsigned int hash)
+{
+ if (s_balancingFactor == 0) {
+ return /* skip handler */ 1 + g_numDistributorThreads + (hash % g_numWorkerThreads);
+ }
+
+ /* we start with one, representing the query we are currently handling */
+ double currentLoad = 1;
+ std::vector<unsigned int> load(g_numWorkerThreads);
+ for (size_t idx = 0; idx < g_numWorkerThreads; idx++) {
+ load[idx] = getWorkerLoad(idx);
+ currentLoad += load[idx];
+ // cerr<<"load for worker "<<idx<<" is "<<load[idx]<<endl;
+ }
+
+ double targetLoad = (currentLoad / g_numWorkerThreads) * s_balancingFactor;
+ // cerr<<"total load is "<<currentLoad<<", number of workers is "<<g_numWorkerThreads<<", target load is "<<targetLoad<<endl;
+
+ unsigned int worker = hash % g_numWorkerThreads;
+ /* at least one server has to be at or below the average load */
+ if (load[worker] > targetLoad) {
+ ++g_stats.rebalancedQueries;
+ do {
+ // cerr<<"worker "<<worker<<" is above the target load, selecting another one"<<endl;
+ worker = (worker + 1) % g_numWorkerThreads;
+ }
+ while(load[worker] > targetLoad);
+ }
+
+ return /* skip handler */ 1 + g_numDistributorThreads + worker;
+}
+