*/
if (fctx->state != fetchstate_init) {
cevent = &fctx->control_event;
- isc_task_send(fctx->res->buckets[fctx->bucketnum].task,
- &cevent);
+ isc_task_sendto(fctx->res->buckets[fctx->bucketnum].task,
+ &cevent, fctx->bucketnum);
}
}
isc_mutex_init(&res->buckets[i].lock);
res->buckets[i].task = NULL;
- result = isc_task_create(taskmgr, 0, &res->buckets[i].task);
+ /*
+ * Since we have a pool of tasks we bind them to task queues
+ * to spread the load evenly
+ */
+ result = isc_task_create_bound(taskmgr, 0,
+ &res->buckets[i].task, i);
if (result != ISC_R_SUCCESS) {
isc_mutex_destroy(&res->buckets[i].lock);
goto cleanup_buckets;
* state it will stay on the runner it's currently on, if a task is in idle
* state it can be woken up on a specific runner with isc_task_sendto - that
* helps with data locality on CPU.
+ *
+ * To make load even some tasks (from task pools) are bound to specific
+ * queues using isc_task_create_bound. This way load balancing between
+ * CPUs/queues happens on the higher layer.
*/
#ifdef ISC_TASK_TRACE
char name[16];
void * tag;
unsigned int threadid;
+ bool bound;
/* Locked by task manager lock. */
LINK(isc__task_t) link;
LINK(isc__task_t) ready_link;
isc__taskmgr_resume(isc_taskmgr_t *manager0);
-#define DEFAULT_TASKMGR_QUANTUM 10
-#define DEFAULT_DEFAULT_QUANTUM 5
+#define DEFAULT_DEFAULT_QUANTUM 25
#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks))
/*%
isc_result_t
isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
- isc_task_t **taskp)
+ isc_task_t **taskp)
+{
+ return (isc_task_create_bound(manager0, quantum, taskp, -1));
+}
+
+isc_result_t
+isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum,
+ isc_task_t **taskp, int threadid)
{
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
isc__task_t *task;
return (ISC_R_NOMEMORY);
XTRACE("isc_task_create");
task->manager = manager;
- task->threadid = atomic_fetch_add_explicit(&manager->curq, 1,
- memory_order_relaxed)
- % manager->workers;
- isc_mutex_init(&task->lock);
+ if (threadid == -1) {
+ /*
+ * Task is not pinned to a queue, it's threadid will be
+ * choosen when first task will be sent to it - either
+ * randomly or specified by isc_task_sendto.
+ */
+ task->bound = false;
+ task->threadid = 0;
+ } else {
+ /*
+ * Task is pinned to a queue, it'll always be run
+ * by a specific thread.
+ */
+ task->bound = true;
+ task->threadid = threadid % manager->workers;
+ }
+
+ isc_mutex_init(&task->lock);
task->state = task_state_idle;
task->references = 1;
INIT_LIST(task->events);
INIT_LIST(task->on_shutdown);
task->nevents = 0;
- task->quantum = quantum;
+ task->quantum = (quantum > 0) ? quantum : manager->default_quantum;
task->flags = 0;
task->now = 0;
isc_time_settoepoch(&task->tnow);
exiting = false;
LOCK(&manager->lock);
if (!manager->exiting) {
- if (task->quantum == 0)
- task->quantum = manager->default_quantum;
APPEND(manager->tasks, task, link);
- } else
+ } else {
exiting = true;
+ }
UNLOCK(&manager->lock);
if (exiting) {
REQUIRE(VALID_TASK(task));
XTRACE("isc_task_send");
- if (c < 0) {
+ /* If task is bound ignore provided cpu. */
+ if (task->bound) {
+ c = task->threadid;
+ } else if (c < 0) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
}
REQUIRE(VALID_TASK(task));
XTRACE("isc_task_sendanddetach");
- if (c < 0) {
+ if (task->bound) {
+ c = task->threadid;
+ } else if (c < 0) {
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
memory_order_relaxed);
}