void pipe_drain(int fd); /* implementation in io.c */
void pipe_kick(int fd); /* implementation in io.c */
+ssize_t
+pipe_drain_data(int fd, void *data, size_t size)
+{
+ int ret_val;
+
+ try:
+ ret_val = read(fd, data, size);
+ if (ret_val < 0)
+ {
+ if (errno == EINTR)
+ goto try;
+ if (errno == EAGAIN)
+ return 0;
+ die("wakeup read: %m");
+ }
+// if (ret_val == 64)
+// goto try;
+ return ret_val;
+}
+
+void
+pipe_kick_data(int fd, void *data, size_t size)
+{
+ u64 v = 1;
+ int ret_val;
+
+ try:
+ ret_val = write(fd, data, size);
+ if (ret_val < 0)
+ {
+ if (errno == EINTR)
+ goto try;
+ if (errno == EAGAIN)
+ return;
+ die("wakeup write: %m");
+ }
+}
+
static list rpki_proto_list;
static pthread_mutex_t rpki_proto_list_lock;
rtrlib = NULL;
}
-static const char *rtr_socket_states[] = {
+static const char *mgr_str_status_descript[] = {
+ [RTR_MGR_CLOSED] = "RTR sockets are disconnected",
+ [RTR_MGR_CONNECTING] = "RTR sockets trying to establish a connection.",
+ [RTR_MGR_ESTABLISHED] = "All RTR sockets of the group are synchronized with the rtr servers",
+ [RTR_MGR_ERROR] = "Error occured on at least one RTR socket",
+};
+
+static const char *rtr_socket_states_descript[] = {
[RTR_CONNECTING] = "Socket is establishing the transport connection",
[RTR_ESTABLISHED] = "Connection is established, socket is waiting for a Serial Notify or expiration of the refresh_interval timer",
[RTR_RESET] = "Resetting RTR connection",
};
static void
-rtr_mgr_thread_status_hook(const struct rtr_mgr_group *group, enum rtr_mgr_status status, const struct rtr_socket *socket, void *data)
+status_update_rtrlib_thread_hook(const struct rtr_mgr_group *group, enum rtr_mgr_status mgr_status, const struct rtr_socket *socket, void *data)
{
struct rpki_proto *p = data;
- switch (status)
- {
- case RTR_MGR_ERROR:
- RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states[socket->state]);
- break;
- default:
- RPKI_CACHE_TRACE(p, socket, "[%s] %s", rtr_state_to_str_x(socket->state), rtr_socket_states[socket->state]);
- }
-
- switch (status)
- {
- case RTR_MGR_CONNECTING:
- proto_notify_state(&p->p, PS_START); // TODO: must be in main BIRD thread
- break;
- case RTR_MGR_ESTABLISHED: // BIRD is synchronized with all cache servers within the same preference cache group
- proto_notify_state(&p->p, PS_UP); // TODO: must be in main BIRD thread
- break;
- }
-}
-
-/* This seems useless, TODO: Remove it */
-static void
-rtr_thread_status_hook(const struct rtr_socket *socket, const enum rtr_socket_state status, void *data)
-{
- struct rpki_proto *p = data;
-
- RPKI_CACHE_TRACE(p, socket, "[%s == %s] %s == %s", rtr_state_to_str_x(socket->state), rtr_state_to_str_x(status), rtr_socket_states[socket->state], rtr_socket_states[status]);
-
- switch (status)
+ switch (socket->state)
{
case RTR_SHUTDOWN:
+ if (mgr_status == RTR_MGR_CLOSED)
+ {
+ RPKI_CACHE_TRACE(p, socket, "%s", mgr_str_status_descript[mgr_status]);
+ }
break;
-
case RTR_ERROR_FATAL:
case RTR_ERROR_TRANSPORT:
case RTR_ERROR_NO_DATA_AVAIL: /** No validation records are available on the RTR server. */
case RTR_ERROR_NO_INCR_UPDATE_AVAIL: /** Server was unable to answer the last serial or reset query. */
- RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states[socket->state]);
+ RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states_descript[socket->state]);
break;
-
case RTR_FAST_RECONNECT:
case RTR_SYNC:
case RTR_RESET:
case RTR_CONNECTING:
- proto_notify_state(&p->p, PS_START);
+ RPKI_CACHE_TRACE(p, socket, "[%s] %s", rtr_state_to_str_x(socket->state), rtr_socket_states_descript[socket->state]);
+ pipe_kick_data(p->status_update.write->fd, &((int){PS_START}), sizeof(int));
break;
-
case RTR_ESTABLISHED:
- proto_notify_state(&p->p, PS_UP);
+ if (mgr_status == RTR_MGR_ESTABLISHED)
+ {
+ RPKI_CACHE_TRACE(p, socket, "%s", mgr_str_status_descript[mgr_status]);
+ pipe_kick_data(p->status_update.write->fd, &((int){PS_UP}), sizeof(int));
+ }
break;
}
}
}
static void
-rtr_thread_update_hook(void *pfx_table, const struct pfx_record rec, const bool added)
+roa_update_rtrlib_thread_hook(void *pfx_table, const struct pfx_record rec, const bool added)
{
struct rpki_proto *p = get_rpki_proto_by_rtr_socket(rec.socket);
if (!p)
}
static int
-recv_data_in_main_thread(struct birdsock *sk, int size)
+status_update_bird_thread_hook(struct birdsock *sk, int size)
+{
+ struct rpki_proto *p = sk->data;
+
+ int proto_state = -1;
+ if (pipe_drain_data(sk->fd, &proto_state, sizeof(int)) > 0)
+ {
+ switch (proto_state)
+ {
+ case PS_DOWN:
+ case PS_START:
+ case PS_STOP:
+ case PS_UP:
+ if (proto_state != p->p.proto_state)
+ proto_notify_state(&p->p, proto_state);
+ break;
+ default:
+ RPKI_ERROR(p, "%s: we received some bullshit %d", __func__, proto_state);
+ }
+ }
+
+ return 0;
+}
+
+static int
+roa_update_bird_thread_hook(struct birdsock *sk, int size)
{
struct rpki_proto *p = sk->data;
struct rpki_entry *e;
}
static void
-recv_err_in_main_thread(struct birdsock *sk, int err)
+pipe_error_hook(struct birdsock *sk, int err)
{
struct rpki_proto *p = sk->data;
- RPKI_ERROR(p, "Notify socket error: %m", err);
+ RPKI_ERROR(p, "Notify socket error[%d] %m", err);
}
static sock *
create_read_pipe(struct rpki_proto *p, int fd)
{
sock *sk = create_pipe(p, fd);
- sk->rx_hook = recv_data_in_main_thread;
- sk->err_hook = recv_err_in_main_thread;
+ sk->err_hook = pipe_error_hook;
if (sk_open(sk) < 0)
return NULL;
return sk;
}
static void
-create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair)
+create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair, int (*recv_callback)(struct birdsock *, int))
{
int pipe_fildes[2];
|| ((sk_pair->write = create_write_pipe(p, pipe_fildes[1])) == NULL)
)
RPKI_DIE(p, "pipe: %m");
+
+ sk_pair->read->rx_hook = recv_callback;
}
static uint
else
s = create_rtrlib_tcp_socket(cache, pool);
- s->connection_state_fp = &rtr_thread_status_hook;
s->connection_state_fp_param = p;
return s;
}
{
struct rtr_mgr_group_crate grouped_list = group_cache_list_by_preferences(p, &cf->cache_list, p->p.pool);
- p->rtr_conf = rtr_mgr_init_x(grouped_list.groups, grouped_list.groups_len, 10, 20, &rtr_thread_update_hook, NULL, &rtr_mgr_thread_status_hook, p);
+ p->rtr_conf = rtr_mgr_init_x(
+ grouped_list.groups,
+ grouped_list.groups_len,
+ 10, 20,
+ &roa_update_rtrlib_thread_hook,
+ NULL,
+ &status_update_rtrlib_thread_hook, p);
return rtr_mgr_start_x(p->rtr_conf);
}
struct rpki_proto *p = (struct rpki_proto *) P;
struct rpki_config *cf = (struct rpki_config *) (P->cf);
- create_pipe_pair(p, &p->roa_update);
+ create_pipe_pair(p, &p->status_update, status_update_bird_thread_hook);
+
+ create_pipe_pair(p, &p->roa_update, roa_update_bird_thread_hook);
init_list(&p->roa_update_list);
pthread_mutex_init(&p->roa_update_lock, NULL);