}
}
+#define OUTSTANDING(_x) ((_x)->stats.in - (_x)->stats.out)
+
/** Send a message on the "best" channel.
*
* @param nr the network
}
} else if (nr->num_blocked == 0) {
+ int cmp;
uint32_t one, two;
one = fr_rand() % nr->num_workers;
two = fr_rand() % nr->num_workers;
} while (two == one);
- if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
+ /*
+ * Choose a worker based on minimizing the amount
+ * of future work it's being asked to do.
+ *
+ * If both workers have the same number of
+ * outstanding requests, then choose the worker
+ * which has used the least total CPU time.
+ */
+ cmp = (OUTSTANDING(nr->workers[one]) - OUTSTANDING(nr->workers[two]));
+ if (cmp < 0) {
worker = nr->workers[one];
+
+ } else if (cmp > 0) {
+ worker = nr->workers[two];
+
+ } else if (fr_time_delta_lt(nr->workers[one]->cpu_time, nr->workers[two]->cpu_time)) {
+ worker = nr->workers[one];
+
} else {
worker = nr->workers[two];
}
} else {
int i;
- fr_time_delta_t cpu_time = fr_time_delta_max();
+ uint64_t min_outstanding = UINT64_MAX;
fr_network_worker_t *found = NULL;
/*
- * Some workers are blocked. Pick an active
- * worker with low CPU time.
+ * Some workers are blocked. Pick the worker
+ * with the least amount of future work to do.
*/
for (i = 0; i < nr->num_workers; i++) {
+ uint64_t outstanding;
+
worker = nr->workers[i];
if (worker->blocked) continue;
- if (fr_time_delta_lt(worker->cpu_time, cpu_time)) {
+ outstanding = OUTSTANDING(worker);
+ if (!found) {
+ found = worker;
+ min_outstanding = outstanding;
+
+ } else if (outstanding < min_outstanding) {
found = worker;
- cpu_time = work->cpu_time;
+ min_outstanding = outstanding;
+
+ } else if (outstanding == min_outstanding) {
+ /*
+ * Queue lengths are the same.
+ * Choose this worker if it's
+ * less busy than the previous one we found.
+ */
+ if (fr_time_delta_lt(worker->cpu_time, found->cpu_time)) {
+ found = worker;
+ }
}
}
* Too many outstanding packets for this worker. Drop
* the request.
*
- * @todo - pick another worker? Or maybe keep a
- * local/temporary set of blacklisted workers.
+ * If the worker we've picked has too many outstanding
+ * packets, then we have either only one worker, in which
+ * cae we should drop the packet. Or, we were unable to
+ * find a worker with smaller than max_outstanding
+ * packets. In which case all of the workers are likely
+ * at max_outstanding.
+ *
+ * In both cases, we should just drop the new packet.
*/
fr_assert(worker->stats.in >= worker->stats.out);
if (nr->config.max_outstanding &&
- ((worker->stats.in - worker->stats.out) >= nr->config.max_outstanding)) {
+ (OUTSTANDING(worker) >= nr->config.max_outstanding)) {
RATE_LIMIT_GLOBAL(PERROR, "max_outstanding reached - dropping packet");
goto drop;
}
w->worker = worker;
w->channel = fr_worker_channel_create(worker, w, nr->control);
+ w->predicted = fr_time_delta_from_msec(10);
fr_fatal_assert_msg(w->channel, "Failed creating new channel");
fr_channel_requestor_uctx_add(w->channel, w);