return JOB_REQUEUE_NONE;
}
+/**
+ * Cancel the GLib Main Event Loop
+ */
+static bool cancel(nm_backend_t *this)
+{
+ if (this->loop)
+ {
+ if (g_main_loop_is_running(this->loop))
+ {
+ g_main_loop_quit(this->loop);
+ }
+ g_main_loop_unref(this->loop);
+ }
+ return TRUE;
+}
+
/*
* see header file
*/
{
return;
}
- if (this->loop)
- {
- if (g_main_loop_is_running(this->loop))
- {
- g_main_loop_quit(this->loop);
- }
- g_main_loop_unref(this->loop);
- }
if (this->plugin)
{
g_object_unref(this->plugin);
charon->keep_cap(charon, CAP_DAC_OVERRIDE);
lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)run,
- this, NULL, NULL, JOB_PRIO_CRITICAL));
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, this,
+ NULL, (callback_job_cancel_t)cancel, JOB_PRIO_CRITICAL));
return TRUE;
}
{
this->public.sender->flush(this->public.sender);
}
+
+ /* cancel all threads and wait for their termination */
+ lib->processor->cancel(lib->processor);
+
DESTROY_IF(this->public.receiver);
#ifdef ME
DESTROY_IF(this->public.connect_manager);
#endif /* ME */
/* make sure the cache is clear before unloading plugins */
lib->credmgr->flush_cache(lib->credmgr, CERT_ANY);
- /* unload plugins to release threads */
lib->plugins->unload(lib->plugins);
#ifdef CAPABILITIES_LIBCAP
cap_free(this->caps);
*/
receiver_t public;
- /**
- * Threads job receiving packets
- */
- callback_job_t *job;
-
/**
* current secret to use for cookie calculation
*/
status = charon->socket->receive(charon->socket, &packet);
if (status == NOT_SUPPORTED)
{
- /* the processor destroys this job */
- this->job = NULL;
return JOB_REQUEUE_NONE;
}
else if (status != SUCCESS)
METHOD(receiver_t, destroy, void,
private_receiver_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
this->rng->destroy(this->rng);
this->hasher->destroy(this->hasher);
free(this);
this->rng->get_bytes(this->rng, SECRET_LENGTH, this->secret);
memcpy(this->secret_old, this->secret, SECRET_LENGTH);
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_packets,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_packets,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
sender_t public;
- /**
- * Sender threads job.
- */
- callback_job_t *job;
-
/**
* The packets are stored in a linked list
*/
METHOD(sender_t, destroy, void,
private_sender_t *this)
{
- this->job->cancel(this->job);
this->list->destroy_offset(this->list, offsetof(packet_t, destroy));
this->got->destroy(this->got);
this->sent->destroy(this->sent);
.mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.got = condvar_create(CONDVAR_TYPE_DEFAULT),
.sent = condvar_create(CONDVAR_TYPE_DEFAULT),
- .job = callback_job_create_with_prio((callback_job_cb_t)send_packets,
- this, NULL, NULL, JOB_PRIO_CRITICAL),
.send_delay = lib->settings->get_int(lib->settings,
"%s.send_delay", 0, charon->name),
.send_delay_type = lib->settings->get_int(lib->settings,
"%s.send_delay_response", TRUE, charon->name),
);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_packets,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
ike_sa_t *ike_sa;
- /**
- * job that handles requests from the Android control socket
- */
- callback_job_t *job;
-
/**
* android credentials
*/
}
charon->bus->add_listener(charon->bus, &this->public.listener);
- this->job = callback_job_create((callback_job_cb_t)initiate, this,
- NULL, NULL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create((callback_job_cb_t)initiate, this,
+ NULL, NULL));
return &this->public;
}
* DHCP server address, or broadcast
*/
host_t *dst;
-
- /**
- * Callback job receiving DHCP responses
- */
- callback_job_t *job;
};
/**
METHOD(dhcp_socket_t, destroy, void,
private_dhcp_socket_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
while (this->waiting)
{
this->condvar->signal(this->condvar);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_dhcp,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_dhcp,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
duplicheck_notify_t public;
- /**
- * Callback job dispatching connections
- */
- callback_job_t *job;
-
/**
* Mutex to lock list
*/
enumerator_t *enumerator;
uintptr_t fd;
- if (this->job)
- {
- this->job->cancel(this->job);
- }
enumerator = this->connected->create_enumerator(this->connected);
while (enumerator->enumerate(enumerator, &fd))
{
destroy(this);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
int fd;
- /**
- * Listen job
- */
- callback_job_t *job;
-
/**
* RADIUS shared secret for DAE exchanges
*/
METHOD(eap_radius_dae_t, destroy, void,
private_eap_radius_dae_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->fd != -1)
{
close(this->fd);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
farp_listener_t *listener;
- /**
- * Callback job to read ARP requests
- */
- callback_job_t *job;
-
/**
* RAW socket for ARP requests
*/
METHOD(farp_spoofer_t, destroy, void,
private_farp_spoofer_t *this)
{
- this->job->cancel(this->job);
close(this->skt);
free(this);
}
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_arp,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_arp,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
* Resynchronization message cache
*/
ha_cache_t *cache;
-
- /**
- * FIFO reader thread
- */
- callback_job_t *job;
};
/**
METHOD(ha_ctl_t, destroy, void,
private_ha_ctl_t *this)
{
- this->job->cancel(this->job);
free(this);
}
strerror(errno));
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch_fifo,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch_fifo,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
* HA enabled pool
*/
ha_attribute_t *attr;
-
- /**
- * Dispatcher job
- */
- callback_job_t *job;
};
/**
METHOD(ha_dispatcher_t, destroy, void,
private_ha_dispatcher_t *this)
{
- this->job->cancel(this->job);
free(this);
}
.kernel = kernel,
.attr = attr,
);
- this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
condvar_t *condvar;
- /**
- * Job checking for heartbeats
- */
- callback_job_t *job;
-
/**
* Total number of ClusterIP segments
*/
*/
u_int node;
+ /**
+ * Are we checking for heartbeats?
+ */
+ bool heartbeat_active;
+
/**
* Interval we send hearbeats
*/
{
if (alert == ALERT_SHUTDOWN_SIGNAL)
{
- if (this->job)
+ if (this->heartbeat_active)
{
DBG1(DBG_CFG, "HA heartbeat active, dropping all segments");
deactivate(this, 0, TRUE);
DBG1(DBG_CFG, "no heartbeat received, taking all segments");
activate(this, 0, TRUE);
/* disable heartbeat detection util we get one */
- this->job = NULL;
+ this->heartbeat_active = FALSE;
return JOB_REQUEUE_NONE;
}
return JOB_REQUEUE_DIRECT;
*/
static void start_watchdog(private_ha_segments_t *this)
{
- this->job = callback_job_create_with_prio((callback_job_cb_t)watchdog,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ this->heartbeat_active = TRUE;
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
}
METHOD(ha_segments_t, handle_status, void,
}
}
- this->mutex->unlock(this->mutex);
this->condvar->signal(this->condvar);
+ this->mutex->unlock(this->mutex);
- if (!this->job)
+ if (!this->heartbeat_active)
{
DBG1(DBG_CFG, "received heartbeat, reenabling watchdog");
start_watchdog(this);
METHOD(ha_segments_t, destroy, void,
private_ha_segments_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
this->mutex->destroy(this->mutex);
this->condvar->destroy(this->condvar);
free(this);
return JOB_REQUEUE_NONE;
}
-METHOD(maemo_service_t, destroy, void,
- private_maemo_service_t *this)
+/**
+ * Cancel the GLib Main Event Loop
+ */
+static bool cancel(private_maemo_service_t *this)
{
if (this->loop)
{
}
g_main_loop_unref(this->loop);
}
+ return TRUE;
+}
+
+METHOD(maemo_service_t, destroy, void,
+ private_maemo_service_t *this)
+{
if (this->context)
{
osso_rpc_unset_cb_f(this->context,
}
lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio((callback_job_cb_t)run,
- this, NULL, NULL, JOB_PRIO_CRITICAL));
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)run, this,
+ NULL, (callback_job_cancel_t)cancel, JOB_PRIO_CRITICAL));
return &this->public;
}
* XML unix socket fd
*/
int socket;
-
- /**
- * job accepting stroke messages
- */
- callback_job_t *job;
};
ENUM(ike_sa_state_lower_names, IKE_CREATED, IKE_DELETING,
fdp = malloc_thing(int);
*fdp = fd;
- job = callback_job_create((callback_job_cb_t)process, fdp, free, this->job);
+ job = callback_job_create((callback_job_cb_t)process, fdp, free,
+ (callback_job_cancel_t)return_false);
lib->processor->queue_job(lib->processor, (job_t*)job);
return JOB_REQUEUE_DIRECT;
METHOD(plugin_t, destroy, void,
private_smp_t *this)
{
- this->job->cancel(this->job);
close(this->socket);
free(this);
}
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)dispatch,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)dispatch, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public.plugin;
}
*/
int socket;
- /**
- * job accepting stroke messages
- */
- callback_job_t *receiver;
-
- /**
- * job handling stroke messages
- */
- callback_job_t *handler;
-
/**
* queued stroke commands
*/
this->handling++;
thread_cleanup_pop(TRUE);
job = callback_job_create_with_prio((callback_job_cb_t)process, ctx,
- (void*)stroke_job_context_destroy, this->handler, JOB_PRIO_HIGH);
+ (void*)stroke_job_context_destroy, NULL, JOB_PRIO_HIGH);
lib->processor->queue_job(lib->processor, (job_t*)job);
return JOB_REQUEUE_DIRECT;
}
METHOD(stroke_socket_t, destroy, void,
private_stroke_socket_t *this)
{
- this->handler->cancel(this->handler);
- this->receiver->cancel(this->receiver);
this->commands->destroy_function(this->commands, (void*)stroke_job_context_destroy);
this->condvar->destroy(this->condvar);
this->mutex->destroy(this->mutex);
charon->backends->add_backend(charon->backends, &this->config->backend);
hydra->attributes->add_provider(hydra->attributes, &this->attribute->provider);
- this->receiver = callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->receiver);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
- this->handler = callback_job_create_with_prio((callback_job_cb_t)handle,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->handler);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)handle, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
int ipv6;
- /**
- * Callback job dispatching commands
- */
- callback_job_t *job;
-
/**
* RADIUS shared secret
*/
METHOD(tnc_pdp_t, destroy, void,
private_tnc_pdp_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->ipv4)
{
close(this->ipv4);
}
DBG1(DBG_IKE, "eap method %N selected", eap_type_names, this->type);
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
* Public part
*/
uci_control_t public;
-
- /**
- * Job
- */
- callback_job_t *job;
};
/**
METHOD(uci_control_t, destroy, void,
private_uci_control_t *this)
{
- this->job->cancel(this->job);
unlink(FIFO_FILE);
free(this);
}
}
else
{
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive,
+ this, NULL, (callback_job_cancel_t)return_false,
+ JOB_PRIO_CRITICAL));
}
return &this->public;
}
* Whitelist unix socket file descriptor
*/
int socket;
-
- /**
- * Callback job dispatching commands
- */
- callback_job_t *job;
};
/**
METHOD(whitelist_control_t, destroy, void,
private_whitelist_control_t *this)
{
- this->job->cancel(this->job);
close(this->socket);
free(this);
}
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
+ NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
&checklist->connect_id);
callback_data_t *data = callback_data_create(this, checklist->connect_id);
- job_t *job = (job_t*)callback_job_create((callback_job_cb_t)initiator_finish, data, (callback_job_cleanup_t)callback_data_destroy, NULL);
- lib->scheduler->schedule_job_ms(lib->scheduler, job, ME_WAIT_TO_FINISH);
+ lib->scheduler->schedule_job_ms(lib->scheduler,
+ (job_t*)callback_job_create((callback_job_cb_t)initiator_finish,
+ data, (callback_job_cleanup_t)callback_data_destroy, NULL),
+ ME_WAIT_TO_FINISH);
checklist->is_finishing = TRUE;
}
*/
static void queue_retransmission(private_connect_manager_t *this, check_list_t *checklist, endpoint_pair_t *pair)
{
- callback_data_t *data = retransmit_data_create(this, checklist->connect_id, pair->id);
- job_t *job = (job_t*)callback_job_create((callback_job_cb_t)retransmit, data, (callback_job_cleanup_t)callback_data_destroy, NULL);
+ callback_data_t *data;
+ job_t *job;
+
+ data = retransmit_data_create(this, checklist->connect_id, pair->id);
+ job = (job_t*)callback_job_create((callback_job_cb_t)retransmit, data,
+ (callback_job_cleanup_t)callback_data_destroy, NULL);
u_int32_t retransmission = pair->retransmitted + 1;
u_int32_t rto = ME_INTERVAL;
/**
* Schedules checks for a checklist (time in ms)
*/
-static void schedule_checks(private_connect_manager_t *this, check_list_t *checklist, u_int32_t time)
+static void schedule_checks(private_connect_manager_t *this,
+ check_list_t *checklist, u_int32_t time)
{
callback_data_t *data = callback_data_create(this, checklist->connect_id);
- checklist->sender = (job_t*)callback_job_create((callback_job_cb_t)sender, data, (callback_job_cleanup_t)callback_data_destroy, NULL);
+ checklist->sender = (job_t*)callback_job_create((callback_job_cb_t)sender,
+ data, (callback_job_cleanup_t)callback_data_destroy, NULL);
lib->scheduler->schedule_job_ms(lib->scheduler, checklist->sender, time);
}
if (get_initiated_by_ids(this, checklist->initiator.id,
checklist->responder.id, &initiated) == SUCCESS)
{
+ callback_job_t *job;
+
remove_checklist(this, checklist);
remove_initiated(this, initiated);
initiate_data_t *data = initiate_data_create(checklist, initiated);
- job_t *job = (job_t*)callback_job_create((callback_job_cb_t)initiate_mediated, data, (callback_job_cleanup_t)initiate_data_destroy, NULL);
- lib->processor->queue_job(lib->processor, job);
+ job = callback_job_create((callback_job_cb_t)initiate_mediated,
+ data, (callback_job_cleanup_t)initiate_data_destroy, NULL);
+ lib->processor->queue_job(lib->processor, (job_t*)job);
return;
}
else
*/
linked_list_t *ipsec_devices;
- /**
- * job receiving PF_KEY events
- */
- callback_job_t *job;
-
/**
* mutex to lock access to the PF_KEY socket
*/
METHOD(kernel_ipsec_t, destroy, void,
private_kernel_klips_ipsec_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->socket > 0)
{
close(this->socket);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive_events,
+ this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
return &this->public;
}
*/
hashtable_t *sas;
- /**
- * Job receiving netlink events
- */
- callback_job_t *job;
-
/**
* Netlink xfrm socket (IPsec)
*/
enumerator_t *enumerator;
policy_entry_t *policy;
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->socket_xfrm_events > 0)
{
close(this->socket_xfrm_events);
destroy(this);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio(
+ (callback_job_cb_t)receive_events, this, NULL,
+ (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
}
return &this->public;
*/
linked_list_t *ifaces;
- /**
- * job receiving netlink events
- */
- callback_job_t *job;
-
/**
* netlink rt socket (routing)
*/
manage_rule(this, RTM_DELRULE, AF_INET6, this->routing_table,
this->routing_table_prio);
}
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->socket_events > 0)
{
close(this->socket_events);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio(
+ (callback_job_cb_t)receive_events, this, NULL,
+ (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
}
if (init_address_list(this) != SUCCESS)
*/
bool install_routes;
- /**
- * job receiving PF_KEY events
- */
- callback_job_t *job;
-
/**
* mutex to lock access to the PF_KEY socket
*/
METHOD(kernel_ipsec_t, destroy, void,
private_kernel_pfkey_ipsec_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->socket > 0)
{
close(this->socket);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio(
+ (callback_job_cb_t)receive_events, this, NULL,
+ (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
}
return &this->public;
*/
linked_list_t *ifaces;
- /**
- * job receiving PF_ROUTE events
- */
- callback_job_t *job;
-
/**
* mutex to lock access to the PF_ROUTE socket
*/
METHOD(kernel_net_t, destroy, void,
private_kernel_pfroute_net_t *this)
{
- if (this->job)
- {
- this->job->cancel(this->job);
- }
if (this->socket > 0)
{
close(this->socket);
return NULL;
}
- this->job = callback_job_create_with_prio((callback_job_cb_t)receive_events,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio(
+ (callback_job_cb_t)receive_events, this, NULL,
+ (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
}
if (init_address_list(this) != SUCCESS)
char *path;
/* loaded library */
pkcs11_library_t *lib;
- /* event dispatcher job */
- callback_job_t *job;
} lib_entry_t;
/**
*/
static void lib_entry_destroy(lib_entry_t *entry)
{
- if (entry->job)
- {
- entry->job->cancel(entry->job);
- }
entry->lib->destroy(entry->lib);
free(entry);
}
return JOB_REQUEUE_DIRECT;
}
-/**
- * End dispatching, unset job
- */
-static void end_dispatch(lib_entry_t *entry)
-{
- entry->job = NULL;
-}
-
/**
* Get the slot list of a library
*/
while (enumerator->enumerate(enumerator, &entry))
{
query_slots(entry);
- entry->job = callback_job_create_with_prio((void*)dispatch_slot_events,
- entry, (void*)end_dispatch, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)entry->job);
+ lib->processor->queue_job(lib->processor,
+ (job_t*)callback_job_create_with_prio((void*)dispatch_slot_events,
+ entry, NULL, (void*)return_false, JOB_PRIO_CRITICAL));
}
enumerator->destroy(enumerator);
callback_job_cleanup_t cleanup;
/**
- * thread of the job, if running
+ * cancel function
*/
- thread_t *thread;
-
- /**
- * mutex to access private job data
- */
- mutex_t *mutex;
-
- /**
- * list of associated child jobs
- */
- linked_list_t *children;
-
- /**
- * parent of this job, or NULL
- */
- private_callback_job_t *parent;
-
- /**
- * TRUE if the job got canceled
- */
- bool canceled;
-
- /**
- * condvar to synchronize the cancellation/destruction of the job
- */
- condvar_t *destroyable;
-
- /**
- * semaphore to synchronize the termination of the assigned thread.
- *
- * separately created during cancellation, so that we can wait on it
- * without risking that it gets destroyed too early during destruction.
- */
- semaphore_t *terminated;
+ callback_job_cancel_t cancel;
/**
* Priority of this job
job_priority_t prio;
};
-/**
- * unregister a child from its parent, if any.
- * note: this->mutex has to be locked
- */
-static void unregister(private_callback_job_t *this)
-{
- if (this->parent)
- {
- this->parent->mutex->lock(this->parent->mutex);
- if (this->parent->canceled && !this->canceled)
- {
- /* if the parent has been canceled but we have not yet, we do not
- * unregister until we got canceled by the parent. */
- this->parent->mutex->unlock(this->parent->mutex);
- this->destroyable->wait(this->destroyable, this->mutex);
- this->parent->mutex->lock(this->parent->mutex);
- }
- this->parent->children->remove(this->parent->children, this, NULL);
- this->parent->mutex->unlock(this->parent->mutex);
- this->parent = NULL;
- }
-}
-
METHOD(job_t, destroy, void,
private_callback_job_t *this)
{
- this->mutex->lock(this->mutex);
- unregister(this);
if (this->cleanup)
{
this->cleanup(this->data);
}
- if (this->terminated)
- {
- this->terminated->post(this->terminated);
- }
- this->children->destroy(this->children);
- this->destroyable->destroy(this->destroyable);
- this->mutex->unlock(this->mutex);
- this->mutex->destroy(this->mutex);
free(this);
}
-METHOD(callback_job_t, cancel, void,
+METHOD(job_t, execute, job_requeue_t,
private_callback_job_t *this)
{
- callback_job_t *child;
- semaphore_t *terminated = NULL;
-
- this->mutex->lock(this->mutex);
- this->canceled = TRUE;
- /* terminate children */
- while (this->children->get_first(this->children, (void**)&child) == SUCCESS)
- {
- this->mutex->unlock(this->mutex);
- child->cancel(child);
- this->mutex->lock(this->mutex);
- }
- if (this->thread)
- {
- /* terminate the thread, if there is currently one executing the job.
- * we wait for its termination using a semaphore */
- this->thread->cancel(this->thread);
- terminated = this->terminated = semaphore_create(0);
- }
- else
- {
- /* if the job is currently queued, it gets terminated later.
- * we can't wait, because it might not get executed at all.
- * we also unregister the queued job manually from its parent (the
- * others get unregistered during destruction) */
- unregister(this);
- }
- this->destroyable->signal(this->destroyable);
- this->mutex->unlock(this->mutex);
-
- if (terminated)
- {
- terminated->wait(terminated);
- terminated->destroy(terminated);
- }
+ return this->callback(this->data);
}
-METHOD(job_t, execute, job_requeue_t,
+METHOD(job_t, cancel, bool,
private_callback_job_t *this)
{
- bool requeue = FALSE;
-
- this->mutex->lock(this->mutex);
- this->thread = thread_current();
- this->mutex->unlock(this->mutex);
-
- while (TRUE)
- {
- this->mutex->lock(this->mutex);
- if (this->canceled)
- {
- this->mutex->unlock(this->mutex);
- break;
- }
- this->mutex->unlock(this->mutex);
- switch (this->callback(this->data))
- {
- case JOB_REQUEUE_DIRECT:
- continue;
- case JOB_REQUEUE_FAIR:
- {
- requeue = TRUE;
- break;
- }
- case JOB_REQUEUE_NONE:
- default:
- {
- break;
- }
- }
- break;
- }
- this->mutex->lock(this->mutex);
- this->thread = NULL;
- this->mutex->unlock(this->mutex);
- /* manually create a cancellation point to avoid that a canceled thread
- * goes back into the thread pool at all */
- thread_cancellation_point();
- return requeue ? JOB_REQUEUE_FAIR : JOB_REQUEUE_NONE;
+ return this->cancel(this->data);
}
METHOD(job_t, get_priority, job_priority_t,
* Described in header.
*/
callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup, callback_job_t *parent,
- job_priority_t prio)
+ callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+ job_priority_t prio)
{
private_callback_job_t *this;
.get_priority = _get_priority,
.destroy = _destroy,
},
- .cancel = _cancel,
},
- .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
.callback = cb,
.data = data,
.cleanup = cleanup,
- .children = linked_list_create(),
- .parent = (private_callback_job_t*)parent,
- .destroyable = condvar_create(CONDVAR_TYPE_DEFAULT),
+ .cancel = cancel,
.prio = prio,
);
- /* register us at parent */
- if (parent)
+ if (cancel)
{
- this->parent->mutex->lock(this->parent->mutex);
- this->parent->children->insert_last(this->parent->children, this);
- this->parent->mutex->unlock(this->parent->mutex);
+ this->public.job.cancel = _cancel;
}
return &this->public;
*/
callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
- callback_job_t *parent)
+ callback_job_cancel_t cancel)
{
- return callback_job_create_with_prio(cb, data, cleanup, parent,
+ return callback_job_create_with_prio(cb, data, cleanup, cancel,
JOB_PRIO_MEDIUM);
}
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2007-2011 Martin Willi
* Copyright (C) 2011 revosec AG
* Hochschule fuer Technik Rapperswil
* to supply to the constructor.
*
* @param data param supplied to job
- * @return requeing policy how to requeue the job
*/
typedef void (*callback_job_cleanup_t)(void *data);
+/**
+ * Cancellation function to use for the callback job.
+ *
+ * Optional function to be called when a job has to be canceled.
+ *
+ * See job_t.cancel() for details on the return value.
+ *
+ * @param data param supplied to job
+ * @return TRUE if canceled, FALSE to explicitly cancel the thread
+ */
+typedef bool (*callback_job_cancel_t)(void *data);
+
/**
* Class representing an callback Job.
*
*/
job_t job;
- /**
- * Cancel the job's thread and wait for its termination.
- *
- * This only works reliably for jobs that always use JOB_REQUEUE_FAIR or
- * JOB_REQUEUE_DIRECT, otherwise the job may already be destroyed when
- * cancel is called.
- */
- void (*cancel)(callback_job_t *this);
};
/**
*
* The cleanup function is called when the job gets destroyed to destroy
* the associated data.
- * If parent is not NULL, the specified job gets an association. Whenever
- * the parent gets cancelled (or runs out), all of its children are cancelled,
- * too.
+ *
+ * The cancel function is optional and should only be provided if the callback
+ * function calls potentially blocking functions and/or always returns
+ * JOB_REQUEUE_DIRECT.
*
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
- * @param parent parent of this job
+ * @param cancel function to cancel the job, or NULL
* @return callback_job_t object
*/
callback_job_t *callback_job_create(callback_job_cb_t cb, void *data,
callback_job_cleanup_t cleanup,
- callback_job_t *parent);
+ callback_job_cancel_t cancel);
/**
* Creates a callback job, with priority.
* @param cb callback to call from the processor
* @param data user data to supply to callback
* @param cleanup destructor for data on destruction, or NULL
- * @param parent parent of this job
+ * @param cancel function to cancel the job, or NULL
* @param prio job priority
* @return callback_job_t object
*/
callback_job_t *callback_job_create_with_prio(callback_job_cb_t cb, void *data,
- callback_job_cleanup_t cleanup, callback_job_t *parent,
- job_priority_t prio);
+ callback_job_cleanup_t cleanup, callback_job_cancel_t cancel,
+ job_priority_t prio);
#endif /** CALLBACK_JOB_H_ @}*/
*/
job_requeue_t (*execute) (job_t *this);
+ /**
+ * Cancel a job.
+ *
+ * Implementing this method is optional. It allows potentially blocking
+ * jobs to be canceled during shutdown.
+ *
+ * If no special action is to be taken simply return FALSE then the thread
+ * executing the job will be canceled. If TRUE is returned the job is
+ * expected to return from execute() itself (i.e. the thread won't be
+ * canceled explicitly and can still be joined later).
+ * Jobs that return FALSE have to make sure they provide the appropriate
+ * cancellation points.
+ *
+ * @note Regular jobs that do not block MUST NOT implement this method.
+ * @note This method could be called even before execute() has been called.
+ *
+ * @return FALSE to cancel the thread, TRUE if canceled otherwise
+ */
+ bool (*cancel)(job_t *this);
+
/**
* Get the priority of a job.
*
*
* Use the status of a job to decide what to do during destruction.
*/
- void (*destroy) (job_t *this);
+ void (*destroy)(job_t *this);
};
#endif /** JOB_H_ @}*/
condvar_t *thread_terminated;
};
-
/**
* Worker thread
*/
{
break;
}
+ else if (!worker->job->cancel)
+ { /* only allow cancelable jobs to requeue directly */
+ requeue = JOB_REQUEUE_FAIR;
+ break;
+ }
}
thread_cleanup_pop(FALSE);
this->mutex->lock(this->mutex);
this->working_threads[i]--;
- switch (requeue)
+ if (worker->job->status == JOB_STATUS_CANCELED)
+ { /* job was canceled via a custom cancel() method or did not
+ * use JOB_REQUEUE_TYPE_DIRECT */
+ worker->job->destroy(worker->job);
+ }
+ else
{
- case JOB_REQUEUE_NONE:
- worker->job->status = JOB_STATUS_DONE;
- worker->job->destroy(worker->job);
- break;
- case JOB_REQUEUE_FAIR:
- worker->job->status = JOB_STATUS_QUEUED;
- this->jobs[i]->insert_last(this->jobs[i], worker->job);
- this->job_added->signal(this->job_added);
- break;
- case JOB_REQUEUE_SCHEDULED:
- worker->job->status = JOB_STATUS_QUEUED;
- /* fall-through */
- default:
- break;
+ switch (requeue)
+ {
+ case JOB_REQUEUE_NONE:
+ worker->job->status = JOB_STATUS_DONE;
+ worker->job->destroy(worker->job);
+ break;
+ case JOB_REQUEUE_FAIR:
+ worker->job->status = JOB_STATUS_QUEUED;
+ this->jobs[i]->insert_last(this->jobs[i],
+ worker->job);
+ this->job_added->signal(this->job_added);
+ break;
+ case JOB_REQUEUE_SCHEDULED:
+ default:
+ break;
+ }
}
break;
}
this->mutex->unlock(this->mutex);
}
-METHOD(processor_t, destroy, void,
+METHOD(processor_t, cancel, void,
private_processor_t *this)
{
+ enumerator_t *enumerator;
worker_thread_t *worker;
- int i;
- set_threads(this, 0);
this->mutex->lock(this->mutex);
+ this->desired_threads = 0;
+ /* cancel potentially blocking jobs */
+ enumerator = this->threads->create_enumerator(this->threads);
+ while (enumerator->enumerate(enumerator, (void**)&worker))
+ {
+ if (worker->job && worker->job->cancel)
+ {
+ worker->job->status = JOB_STATUS_CANCELED;
+ if (!worker->job->cancel(worker->job))
+ { /* job requests to be canceled explicitly, otherwise we assume
+ * the thread terminates itself and can be joined */
+ worker->thread->cancel(worker->thread);
+ }
+ }
+ }
+ enumerator->destroy(enumerator);
while (this->total_threads > 0)
{
this->job_added->broadcast(this->job_added);
free(worker);
}
this->mutex->unlock(this->mutex);
+}
+
+METHOD(processor_t, destroy, void,
+ private_processor_t *this)
+{
+ int i;
+
+ cancel(this);
this->thread_terminated->destroy(this->thread_terminated);
this->job_added->destroy(this->job_added);
this->mutex->destroy(this->mutex);
.get_job_load = _get_job_load,
.queue_job = _queue_job,
.set_threads = _set_threads,
+ .cancel = _cancel,
.destroy = _destroy,
},
.threads = linked_list_create(),
/*
+ * Copyright (C) 2012 Tobias Brunner
* Copyright (C) 2005-2007 Martin Willi
* Copyright (C) 2005 Jan Hutter
* Hochschule fuer Technik Rapperswil
*
* If the number of threads is smaller than number of currently running
* threads, thread count is decreased. Use 0 to disable the processor.
- * This call blocks if it decreases thread count until threads have
- * terminated, so make sure there are not too many blocking jobs.
+ *
+ * This call does not block and wait for threads to terminate if the number
+ * of threads is reduced. Instead use cancel() for that during shutdown.
*
* @param count number of threads to allocate
*/
void (*set_threads)(processor_t *this, u_int count);
+ /**
+ * Sets the number of threads to 0 and cancels all blocking jobs, then waits
+ * for all threads to be terminated.
+ */
+ void (*cancel)(processor_t *this);
+
/**
* Destroy a processor object.
*/
*/
scheduler_t public;
- /**
- * Job which queues scheduled jobs to the processor.
- */
- callback_job_t *job;
-
/**
* The heap in which the events are stored.
*/
private_scheduler_t *this)
{
event_t *event;
- this->job->cancel(this->job);
this->condvar->destroy(this->condvar);
this->mutex->destroy(this->mutex);
while ((event = remove_event(this)) != NULL)
scheduler_t * scheduler_create()
{
private_scheduler_t *this;
+ callback_job_t *job;
INIT(this,
.public = {
this->heap = (event_t**)calloc(this->heap_size + 1, sizeof(event_t*));
- this->job = callback_job_create_with_prio((callback_job_cb_t)schedule,
- this, NULL, NULL, JOB_PRIO_CRITICAL);
- lib->processor->queue_job(lib->processor, (job_t*)this->job);
+ job = callback_job_create_with_prio((callback_job_cb_t)schedule, this,
+ NULL, return_false, JOB_PRIO_CRITICAL);
+ lib->processor->queue_job(lib->processor, (job_t*)job);
return &this->public;
}