From: Pavel TvrdĂ­k Date: Mon, 26 Oct 2015 15:54:46 +0000 (+0100) Subject: RPKI: refactore status update hook line X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=50791a41bf8d54027893a4a03a6666e5f302c4f4;p=thirdparty%2Fbird.git RPKI: refactore status update hook line --- diff --git a/proto/rpki/rpki.c b/proto/rpki/rpki.c index 3202e52ad..2243c7ea1 100644 --- a/proto/rpki/rpki.c +++ b/proto/rpki/rpki.c @@ -55,6 +55,44 @@ struct rpki_entry { void pipe_drain(int fd); /* implementation in io.c */ void pipe_kick(int fd); /* implementation in io.c */ +ssize_t +pipe_drain_data(int fd, void *data, size_t size) +{ + int ret_val; + + try: + ret_val = read(fd, data, size); + if (ret_val < 0) + { + if (errno == EINTR) + goto try; + if (errno == EAGAIN) + return 0; + die("wakeup read: %m"); + } +// if (ret_val == 64) +// goto try; + return ret_val; +} + +void +pipe_kick_data(int fd, void *data, size_t size) +{ + u64 v = 1; + int ret_val; + + try: + ret_val = write(fd, data, size); + if (ret_val < 0) + { + if (errno == EINTR) + goto try; + if (errno == EAGAIN) + return; + die("wakeup write: %m"); + } +} + static list rpki_proto_list; static pthread_mutex_t rpki_proto_list_lock; @@ -173,7 +211,14 @@ rpki_init_all(void) rtrlib = NULL; } -static const char *rtr_socket_states[] = { +static const char *mgr_str_status_descript[] = { + [RTR_MGR_CLOSED] = "RTR sockets are disconnected", + [RTR_MGR_CONNECTING] = "RTR sockets trying to establish a connection.", + [RTR_MGR_ESTABLISHED] = "All RTR sockets of the group are synchronized with the rtr servers", + [RTR_MGR_ERROR] = "Error occured on at least one RTR socket", +}; + +static const char *rtr_socket_states_descript[] = { [RTR_CONNECTING] = "Socket is establishing the transport connection", [RTR_ESTABLISHED] = "Connection is established, socket is waiting for a Serial Notify or expiration of the refresh_interval timer", [RTR_RESET] = "Resetting RTR connection", @@ -187,59 +232,37 @@ static const char *rtr_socket_states[] = { }; static void -rtr_mgr_thread_status_hook(const struct rtr_mgr_group *group, enum rtr_mgr_status status, const struct rtr_socket *socket, void *data) +status_update_rtrlib_thread_hook(const struct rtr_mgr_group *group, enum rtr_mgr_status mgr_status, const struct rtr_socket *socket, void *data) { struct rpki_proto *p = data; - switch (status) - { - case RTR_MGR_ERROR: - RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states[socket->state]); - break; - default: - RPKI_CACHE_TRACE(p, socket, "[%s] %s", rtr_state_to_str_x(socket->state), rtr_socket_states[socket->state]); - } - - switch (status) - { - case RTR_MGR_CONNECTING: - proto_notify_state(&p->p, PS_START); // TODO: must be in main BIRD thread - break; - case RTR_MGR_ESTABLISHED: // BIRD is synchronized with all cache servers within the same preference cache group - proto_notify_state(&p->p, PS_UP); // TODO: must be in main BIRD thread - break; - } -} - -/* This seems useless, TODO: Remove it */ -static void -rtr_thread_status_hook(const struct rtr_socket *socket, const enum rtr_socket_state status, void *data) -{ - struct rpki_proto *p = data; - - RPKI_CACHE_TRACE(p, socket, "[%s == %s] %s == %s", rtr_state_to_str_x(socket->state), rtr_state_to_str_x(status), rtr_socket_states[socket->state], rtr_socket_states[status]); - - switch (status) + switch (socket->state) { case RTR_SHUTDOWN: + if (mgr_status == RTR_MGR_CLOSED) + { + RPKI_CACHE_TRACE(p, socket, "%s", mgr_str_status_descript[mgr_status]); + } break; - case RTR_ERROR_FATAL: case RTR_ERROR_TRANSPORT: case RTR_ERROR_NO_DATA_AVAIL: /** No validation records are available on the RTR server. */ case RTR_ERROR_NO_INCR_UPDATE_AVAIL: /** Server was unable to answer the last serial or reset query. */ - RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states[socket->state]); + RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states_descript[socket->state]); break; - case RTR_FAST_RECONNECT: case RTR_SYNC: case RTR_RESET: case RTR_CONNECTING: - proto_notify_state(&p->p, PS_START); + RPKI_CACHE_TRACE(p, socket, "[%s] %s", rtr_state_to_str_x(socket->state), rtr_socket_states_descript[socket->state]); + pipe_kick_data(p->status_update.write->fd, &((int){PS_START}), sizeof(int)); break; - case RTR_ESTABLISHED: - proto_notify_state(&p->p, PS_UP); + if (mgr_status == RTR_MGR_ESTABLISHED) + { + RPKI_CACHE_TRACE(p, socket, "%s", mgr_str_status_descript[mgr_status]); + pipe_kick_data(p->status_update.write->fd, &((int){PS_UP}), sizeof(int)); + } break; } } @@ -312,7 +335,7 @@ send_data_to_main_thread(struct rpki_proto *p, struct rpki_entry *e) } static void -rtr_thread_update_hook(void *pfx_table, const struct pfx_record rec, const bool added) +roa_update_rtrlib_thread_hook(void *pfx_table, const struct pfx_record rec, const bool added) { struct rpki_proto *p = get_rpki_proto_by_rtr_socket(rec.socket); if (!p) @@ -382,7 +405,32 @@ rpki_new_cache(void) } static int -recv_data_in_main_thread(struct birdsock *sk, int size) +status_update_bird_thread_hook(struct birdsock *sk, int size) +{ + struct rpki_proto *p = sk->data; + + int proto_state = -1; + if (pipe_drain_data(sk->fd, &proto_state, sizeof(int)) > 0) + { + switch (proto_state) + { + case PS_DOWN: + case PS_START: + case PS_STOP: + case PS_UP: + if (proto_state != p->p.proto_state) + proto_notify_state(&p->p, proto_state); + break; + default: + RPKI_ERROR(p, "%s: we received some bullshit %d", __func__, proto_state); + } + } + + return 0; +} + +static int +roa_update_bird_thread_hook(struct birdsock *sk, int size) { struct rpki_proto *p = sk->data; struct rpki_entry *e; @@ -413,10 +461,10 @@ recv_data_in_main_thread(struct birdsock *sk, int size) } static void -recv_err_in_main_thread(struct birdsock *sk, int err) +pipe_error_hook(struct birdsock *sk, int err) { struct rpki_proto *p = sk->data; - RPKI_ERROR(p, "Notify socket error: %m", err); + RPKI_ERROR(p, "Notify socket error[%d] %m", err); } static sock * @@ -433,8 +481,7 @@ static sock * create_read_pipe(struct rpki_proto *p, int fd) { sock *sk = create_pipe(p, fd); - sk->rx_hook = recv_data_in_main_thread; - sk->err_hook = recv_err_in_main_thread; + sk->err_hook = pipe_error_hook; if (sk_open(sk) < 0) return NULL; return sk; @@ -451,7 +498,7 @@ create_write_pipe(struct rpki_proto *p, int fd) } static void -create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair) +create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair, int (*recv_callback)(struct birdsock *, int)) { int pipe_fildes[2]; @@ -460,6 +507,8 @@ create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair) || ((sk_pair->write = create_write_pipe(p, pipe_fildes[1])) == NULL) ) RPKI_DIE(p, "pipe: %m"); + + sk_pair->read->rx_hook = recv_callback; } static uint @@ -542,7 +591,6 @@ create_rtrlib_socket(struct rpki_proto *p, struct rpki_cache *cache, pool *pool) else s = create_rtrlib_tcp_socket(cache, pool); - s->connection_state_fp = &rtr_thread_status_hook; s->connection_state_fp_param = p; return s; } @@ -605,7 +653,13 @@ rpki_start_rtrlib_mgr(struct rpki_proto *p, struct rpki_config *cf) { struct rtr_mgr_group_crate grouped_list = group_cache_list_by_preferences(p, &cf->cache_list, p->p.pool); - p->rtr_conf = rtr_mgr_init_x(grouped_list.groups, grouped_list.groups_len, 10, 20, &rtr_thread_update_hook, NULL, &rtr_mgr_thread_status_hook, p); + p->rtr_conf = rtr_mgr_init_x( + grouped_list.groups, + grouped_list.groups_len, + 10, 20, + &roa_update_rtrlib_thread_hook, + NULL, + &status_update_rtrlib_thread_hook, p); return rtr_mgr_start_x(p->rtr_conf); } @@ -616,7 +670,9 @@ rpki_start(struct proto *P) struct rpki_proto *p = (struct rpki_proto *) P; struct rpki_config *cf = (struct rpki_config *) (P->cf); - create_pipe_pair(p, &p->roa_update); + create_pipe_pair(p, &p->status_update, status_update_bird_thread_hook); + + create_pipe_pair(p, &p->roa_update, roa_update_bird_thread_hook); init_list(&p->roa_update_list); pthread_mutex_init(&p->roa_update_lock, NULL); diff --git a/proto/rpki/rpki.h b/proto/rpki/rpki.h index 3f59f0e58..b517070b8 100644 --- a/proto/rpki/rpki.h +++ b/proto/rpki/rpki.h @@ -79,6 +79,7 @@ struct rpki_proto { struct rpki_rw_sk_pair roa_update; list roa_update_list; pthread_mutex_t roa_update_lock; + struct rpki_rw_sk_pair status_update; }; struct rpki_cache *rpki_new_cache(void);