*/
#include "replace.h"
+#include "system/select.h"
#include "system/threads.h"
#include "pthreadpool_tevent.h"
#include "pthreadpool.h"
struct tevent_threaded_context *tctx;
/* Pointer to link object owned by *ev. */
struct pthreadpool_tevent_glue_ev_link *ev_link;
+ /* active jobs */
+ struct pthreadpool_tevent_job_state *states;
};
/*
};
struct pthreadpool_tevent_job_state {
+ struct pthreadpool_tevent_job_state *prev, *next;
+ struct pthreadpool_tevent_glue *glue;
struct tevent_context *ev;
struct tevent_req *req;
struct pthreadpool_tevent_job *job;
static int pthreadpool_tevent_glue_destructor(
struct pthreadpool_tevent_glue *glue)
{
+ struct pthreadpool_tevent_job_state *state = NULL;
+ struct pthreadpool_tevent_job_state *nstate = NULL;
+
+ for (state = glue->states; state != NULL; state = nstate) {
+ nstate = state->next;
+
+ /* The job this removes it from the list */
+ pthreadpool_tevent_job_orphan(state->job);
+ }
+
if (glue->pool->glue_list != NULL) {
DLIST_REMOVE(glue->pool->glue_list, glue);
}
return 0;
}
-static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
- struct tevent_context *ev)
+static int pthreadpool_tevent_register_ev(
+ struct pthreadpool_tevent *pool,
+ struct pthreadpool_tevent_job_state *state)
{
+ struct tevent_context *ev = state->ev;
struct pthreadpool_tevent_glue *glue = NULL;
struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
* pair.
*/
for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
- if (glue->ev == ev) {
+ if (glue->ev == state->ev) {
+ state->glue = glue;
+ DLIST_ADD_END(glue->states, state);
return 0;
}
}
}
#endif
+ state->glue = glue;
+ DLIST_ADD_END(glue->states, state);
+
DLIST_ADD(pool->glue_list, glue);
return 0;
}
/*
* We should never be called with needs_fence.orphaned == false.
* Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
- * after detaching from the request state and pool list.
+ * after detaching from the request state, glue and pool list.
*/
if (!job->needs_fence.orphaned) {
abort();
abort();
}
+ /*
+ * Once we marked the request as 'orphaned'
+ * we spin/loop if it's already marked
+ * as 'finished' (which means that
+ * pthreadpool_tevent_job_signal() was entered.
+ * If it saw 'orphaned' it will exit after setting
+ * 'dropped', otherwise it dereferences
+ * job->state->glue->{tctx,ev} until it exited
+ * after setting 'signaled'.
+ *
+ * We need to close this potential gab before
+ * we can set job->state = NULL.
+ *
+ * This is some kind of spinlock, but with
+ * 1 millisecond sleeps in between, in order
+ * to give the thread more cpu time to finish.
+ */
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ while (job->needs_fence.finished) {
+ if (job->needs_fence.dropped) {
+ break;
+ }
+ if (job->needs_fence.signaled) {
+ break;
+ }
+ poll(NULL, 0, 1);
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ }
+
+ /*
+ * Once the gab is closed, we can remove
+ * the glue link.
+ */
+ DLIST_REMOVE(job->state->glue->states, job->state);
+ job->state->glue = NULL;
+
/*
* We need to reparent to a long term context.
* And detach from the request state.
* The job request is not scheduled in the pool
* yet or anymore.
*/
+ if (state->glue != NULL) {
+ DLIST_REMOVE(state->glue->states, state);
+ state->glue = NULL;
+ }
return;
}
return tevent_req_post(req, ev);
}
- ret = pthreadpool_tevent_register_ev(pool, ev);
+ ret = pthreadpool_tevent_register_ev(pool, state);
if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
struct pthreadpool_tevent_job *job =
talloc_get_type_abort(job_private_data,
struct pthreadpool_tevent_job);
- struct pthreadpool_tevent_job_state *state = job->state;
- struct tevent_threaded_context *tctx = NULL;
- struct pthreadpool_tevent_glue *g = NULL;
job->needs_fence.finished = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
return 0;
}
-#ifdef HAVE_PTHREAD
- for (g = job->pool->glue_list; g != NULL; g = g->next) {
- if (g->ev == state->ev) {
- tctx = g->tctx;
- break;
- }
- }
-
- if (tctx == NULL) {
- abort();
- }
-#endif
-
- if (tctx != NULL) {
+ /*
+ * state and state->glue are valid,
+ * see the job->needs_fence.finished
+ * "spinlock" loop in
+ * pthreadpool_tevent_job_orphan()
+ */
+ if (job->state->glue->tctx != NULL) {
/* with HAVE_PTHREAD */
- tevent_threaded_schedule_immediate(tctx, job->im,
+ tevent_threaded_schedule_immediate(job->state->glue->tctx,
+ job->im,
pthreadpool_tevent_job_done,
job);
} else {
/* without HAVE_PTHREAD */
- tevent_schedule_immediate(job->im, state->ev,
+ tevent_schedule_immediate(job->im,
+ job->state->glue->ev,
pthreadpool_tevent_job_done,
job);
}