/**
* Receives PF_ROUTE messages from kernel
*/
-static job_requeue_t receive_events(private_kernel_pfroute_net_t *this)
+static bool receive_events(private_kernel_pfroute_net_t *this, int fd,
+ watcher_event_t event)
{
struct {
union {
char buf[sizeof(struct sockaddr_storage) * RTAX_MAX];
} msg;
int len, hdrlen;
- bool oldstate;
-
- oldstate = thread_cancelability(TRUE);
- len = recv(this->socket, &msg, sizeof(msg), 0);
- thread_cancelability(oldstate);
+ len = recv(this->socket, &msg, sizeof(msg), MSG_DONTWAIT);
if (len < 0)
{
switch (errno)
{
case EINTR:
case EAGAIN:
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
default:
DBG1(DBG_KNL, "unable to receive from PF_ROUTE event socket");
sleep(1);
- return JOB_REQUEUE_FAIR;
+ return TRUE;
}
}
if (len < offsetof(struct rt_msghdr, rtm_flags) || len < msg.rtm.rtm_msglen)
{
DBG1(DBG_KNL, "received invalid PF_ROUTE message");
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (msg.rtm.rtm_version != RTM_VERSION)
{
DBG1(DBG_KNL, "received PF_ROUTE message with unsupported version: %d",
msg.rtm.rtm_version);
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
switch (msg.rtm.rtm_type)
{
hdrlen = sizeof(msg.rtm);
break;
default:
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (msg.rtm.rtm_msglen < hdrlen)
{
DBG1(DBG_KNL, "ignoring short PF_ROUTE message");
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
switch (msg.rtm.rtm_type)
{
this->condvar->broadcast(this->condvar);
this->mutex->unlock(this->mutex);
- return JOB_REQUEUE_DIRECT;
+ return TRUE;
}
if (this->socket != -1)
{
+ lib->watcher->remove(lib->watcher, this->socket);
close(this->socket);
}
enumerator = this->addrs->create_enumerator(this->addrs);
}
else
{
- lib->processor->queue_job(lib->processor,
- (job_t*)callback_job_create_with_prio(
- (callback_job_cb_t)receive_events, this, NULL,
- (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+ lib->watcher->add(lib->watcher, this->socket, WATCHER_READ,
+ (watcher_cb_t)receive_events, this);
}
if (init_address_list(this) != SUCCESS)
{