return 0;
}
+_Bool
+birdloop_in_this_thread(struct birdloop *loop)
+{
+ return pthread_equal(pthread_self(), loop->thread_id);
+}
+
void
birdloop_flag(struct birdloop *loop, u32 flag)
{
while (1) {
rv = write(p->fd[1], &v, sizeof(v));
- if ((rv >= 0) || (errno == EAGAIN))
+ if ((rv >= 0) || (errno == EAGAIN))
return;
if (errno != EINTR)
bug("wakeup write: %m");
{
init_list(&loop->sock_list);
loop->sock_num = 0;
+ atomic_store_explicit(&loop->sock_close_requests, 0, memory_order_relaxed);
+ sem_init(&loop->sock_close_sem, 0, 0);
- BUFFER_INIT(loop->poll_sk, loop->pool, 4);
BUFFER_INIT(loop->poll_fd, loop->pool, 4);
loop->poll_changed = 1; /* add wakeup fd */
+ loop->poll_domain = DOMAIN_NEW(resource, "Poll");
}
static void
rem_node(&s->n);
loop->sock_num--;
- if (s->index >= 0)
+ if (birdloop_in_this_thread(loop))
{
- loop->poll_sk.data[s->index] = NULL;
- s->index = -1;
- loop->poll_changed = 1;
- loop->close_scheduled = 1;
- birdloop_ping(loop);
+ if (s->index >= 0)
+ {
+ s->index = -1;
+ loop->poll_changed = 1;
+ }
+ close(s->fd);
}
else
+ {
+ atomic_fetch_add_explicit(&loop->sock_close_requests, 1, memory_order_acq_rel);
+ wakeup_do_kick(loop);
+ LOCK_DOMAIN(resource, loop->poll_domain);
+ s->index = -1;
close(s->fd);
+ UNLOCK_DOMAIN(resource, loop->poll_domain);
+ sem_post(&loop->sock_close_sem);
+ }
}
void
static inline uint sk_want_events(sock *s)
{ return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
-/*
-FIXME: this should be called from sock code
-
-static void
-sockets_update(struct birdloop *loop, sock *s)
-{
- if (s->index >= 0)
- loop->poll_fd.data[s->index].events = sk_want_events(s);
-}
-*/
-
static void
sockets_prepare(struct birdloop *loop)
{
- BUFFER_SET(loop->poll_sk, loop->sock_num + 1);
+ LOCK_DOMAIN(resource, loop->poll_domain);
BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
uint i = 0;
node *n;
ASSERT(i < loop->sock_num);
s->index = i;
- *psk = s;
pfd->fd = s->fd;
pfd->events = sk_want_events(s);
pfd->revents = 0;
pfd++;
- psk++;
i++;
}
ASSERT(i == loop->sock_num);
/* Add internal wakeup fd */
- *psk = NULL;
pipe_pollin(&loop->wakeup, pfd);
loop->poll_changed = 0;
-}
-
-static void
-sockets_close_fds(struct birdloop *loop)
-{
- struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
- int poll_num = loop->poll_fd.used - 1;
-
- int i;
- for (i = 0; i < poll_num; i++)
- if (psk[i] == NULL)
- close(pfd[i].fd);
-
- loop->close_scheduled = 0;
+ UNLOCK_DOMAIN(resource, loop->poll_domain);
}
int sk_read(sock *s, int revents);
sockets_fire(struct birdloop *loop)
{
struct pollfd *pfd = loop->poll_fd.data;
- sock **psk = loop->poll_sk.data;
int poll_num = loop->poll_fd.used - 1;
times_update();
if (pfd[poll_num].revents & POLLIN)
wakeup_drain(loop);
- int i;
- for (i = 0; i < poll_num; pfd++, psk++, i++)
+ sock *s; node *n, *nxt;
+ WALK_LIST2_DELSAFE(s, n, nxt, loop->sock_list, n)
{
- int e = 1;
+ if (s->index < 0)
+ continue;
+
+ LOCK_DOMAIN(resource, loop->poll_domain);
+ int rev = loop->poll_fd.data[s->index].revents;
+ UNLOCK_DOMAIN(resource, loop->poll_domain);
- if (! pfd->revents)
+ if (! rev)
continue;
- if (pfd->revents & POLLNVAL)
- bug("poll: invalid fd %d", pfd->fd);
+ if (rev & POLLNVAL)
+ bug("poll: invalid fd %d", s->fd);
- if (pfd->revents & POLLIN)
- while (e && *psk && (*psk)->rx_hook)
- e = sk_read(*psk, pfd->revents);
+ int e = 1;
+
+ if (rev & POLLIN)
+ while (e && s->rx_hook)
+ e = sk_read(s, rev);
- e = 1;
- if (pfd->revents & POLLOUT)
+ if (rev & POLLOUT)
{
loop->poll_changed = 1;
- while (e && *psk)
- e = sk_write(*psk);
+ while (e = sk_write(s))
+ ;
}
}
}
birdloop_free(struct birdloop *loop)
{
ASSERT_DIE(loop->links == 0);
- ASSERT_DIE(pthread_equal(pthread_self(), loop->thread_id));
+ ASSERT_DIE(birdloop_in_this_thread(loop));
rcu_birdloop_stop(&loop->rcu);
pthread_attr_destroy(&loop->thread_attr);
birdloop_leave(loop);
+ LOCK_DOMAIN(resource, loop->poll_domain);
try:
rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
if (rv < 0)
goto try;
bug("poll: %m");
}
+ UNLOCK_DOMAIN(resource, loop->poll_domain);
- birdloop_enter(loop);
+ /* Wait until remote requestors close their sockets */
+ int close_count = atomic_exchange_explicit(&loop->sock_close_requests, 0, memory_order_acq_rel);
+ while (close_count--)
+ sem_wait(&loop->sock_close_sem);
- if (loop->close_scheduled)
- sockets_close_fds(loop);
+ birdloop_enter(loop);
if (loop->stopped)
break;