]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
RPKI: refactore status update hook line
authorPavel Tvrdík <pawel.tvrdik@gmail.com>
Mon, 26 Oct 2015 15:54:46 +0000 (16:54 +0100)
committerPavel Tvrdík <pawel.tvrdik@gmail.com>
Tue, 27 Oct 2015 07:56:33 +0000 (08:56 +0100)
proto/rpki/rpki.c
proto/rpki/rpki.h

index 3202e52ad41f3b75dfbc0afc33eb273b32264278..2243c7ea146b626e6606747e2c388ca3b0783a35 100644 (file)
@@ -55,6 +55,44 @@ struct rpki_entry {
 void pipe_drain(int fd);       /* implementation in io.c */
 void pipe_kick(int fd);        /* implementation in io.c */
 
+ssize_t
+pipe_drain_data(int fd, void *data, size_t size)
+{
+  int ret_val;
+
+ try:
+  ret_val = read(fd, data, size);
+  if (ret_val < 0)
+  {
+    if (errno == EINTR)
+      goto try;
+    if (errno == EAGAIN)
+      return 0;
+    die("wakeup read: %m");
+  }
+//  if (ret_val == 64)
+//    goto try;
+  return ret_val;
+}
+
+void
+pipe_kick_data(int fd, void *data, size_t size)
+{
+  u64 v = 1;
+  int ret_val;
+
+ try:
+  ret_val = write(fd, data, size);
+  if (ret_val < 0)
+  {
+    if (errno == EINTR)
+      goto try;
+    if (errno == EAGAIN)
+      return;
+    die("wakeup write: %m");
+  }
+}
+
 static list rpki_proto_list;
 static pthread_mutex_t rpki_proto_list_lock;
 
@@ -173,7 +211,14 @@ rpki_init_all(void)
   rtrlib = NULL;
 }
 
-static const char *rtr_socket_states[] = {
+static const char *mgr_str_status_descript[] = {
+    [RTR_MGR_CLOSED] = "RTR sockets are disconnected",
+    [RTR_MGR_CONNECTING] = "RTR sockets trying to establish a connection.",
+    [RTR_MGR_ESTABLISHED] = "All RTR sockets of the group are synchronized with the rtr servers",
+    [RTR_MGR_ERROR] = "Error occured on at least one RTR socket",
+};
+
+static const char *rtr_socket_states_descript[] = {
     [RTR_CONNECTING]  = "Socket is establishing the transport connection",
     [RTR_ESTABLISHED] = "Connection is established, socket is waiting for a Serial Notify or expiration of the refresh_interval timer",
     [RTR_RESET] = "Resetting RTR connection",
@@ -187,59 +232,37 @@ static const char *rtr_socket_states[] = {
 };
 
 static void
-rtr_mgr_thread_status_hook(const struct rtr_mgr_group *group, enum rtr_mgr_status status, const struct rtr_socket *socket, void *data)
+status_update_rtrlib_thread_hook(const struct rtr_mgr_group *group, enum rtr_mgr_status mgr_status, const struct rtr_socket *socket, void *data)
 {
   struct rpki_proto *p = data;
 
-  switch (status)
-  {
-    case RTR_MGR_ERROR:
-      RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states[socket->state]);
-      break;
-    default:
-      RPKI_CACHE_TRACE(p, socket, "[%s] %s", rtr_state_to_str_x(socket->state), rtr_socket_states[socket->state]);
-  }
-
-  switch (status)
-  {
-    case RTR_MGR_CONNECTING:
-      proto_notify_state(&p->p, PS_START);     // TODO: must be in main BIRD thread
-      break;
-    case RTR_MGR_ESTABLISHED:                  // BIRD is synchronized with all cache servers within the same preference cache group
-      proto_notify_state(&p->p, PS_UP);                // TODO: must be in main BIRD thread
-      break;
-  }
-}
-
-/* This seems useless, TODO: Remove it */
-static void
-rtr_thread_status_hook(const struct rtr_socket *socket, const enum rtr_socket_state status, void *data)
-{
-  struct rpki_proto *p = data;
-
-  RPKI_CACHE_TRACE(p, socket, "[%s == %s] %s == %s", rtr_state_to_str_x(socket->state), rtr_state_to_str_x(status), rtr_socket_states[socket->state], rtr_socket_states[status]);
-
-  switch (status)
+  switch (socket->state)
   {
     case RTR_SHUTDOWN:
+      if (mgr_status == RTR_MGR_CLOSED)
+      {
+       RPKI_CACHE_TRACE(p, socket, "%s", mgr_str_status_descript[mgr_status]);
+      }
       break;
-
     case RTR_ERROR_FATAL:
     case RTR_ERROR_TRANSPORT:
     case RTR_ERROR_NO_DATA_AVAIL: /** No validation records are available on the RTR server. */
     case RTR_ERROR_NO_INCR_UPDATE_AVAIL: /** Server was unable to answer the last serial or reset query. */
-      RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states[socket->state]);
+      RPKI_CACHE_ERROR(p, socket, "%s", rtr_socket_states_descript[socket->state]);
       break;
-
     case RTR_FAST_RECONNECT:
     case RTR_SYNC:
     case RTR_RESET:
     case RTR_CONNECTING:
-      proto_notify_state(&p->p, PS_START);
+      RPKI_CACHE_TRACE(p, socket, "[%s] %s", rtr_state_to_str_x(socket->state), rtr_socket_states_descript[socket->state]);
+      pipe_kick_data(p->status_update.write->fd, &((int){PS_START}), sizeof(int));
       break;
-
     case RTR_ESTABLISHED:
-      proto_notify_state(&p->p, PS_UP);
+      if (mgr_status == RTR_MGR_ESTABLISHED)
+      {
+       RPKI_CACHE_TRACE(p, socket, "%s", mgr_str_status_descript[mgr_status]);
+       pipe_kick_data(p->status_update.write->fd, &((int){PS_UP}), sizeof(int));
+      }
       break;
   }
 }
@@ -312,7 +335,7 @@ send_data_to_main_thread(struct rpki_proto *p, struct rpki_entry *e)
 }
 
 static void
-rtr_thread_update_hook(void *pfx_table, const struct pfx_record rec, const bool added)
+roa_update_rtrlib_thread_hook(void *pfx_table, const struct pfx_record rec, const bool added)
 {
   struct rpki_proto *p = get_rpki_proto_by_rtr_socket(rec.socket);
   if (!p)
@@ -382,7 +405,32 @@ rpki_new_cache(void)
 }
 
 static int
-recv_data_in_main_thread(struct birdsock *sk, int size)
+status_update_bird_thread_hook(struct birdsock *sk, int size)
+{
+  struct rpki_proto *p = sk->data;
+
+  int proto_state = -1;
+  if (pipe_drain_data(sk->fd, &proto_state, sizeof(int)) > 0)
+  {
+    switch (proto_state)
+    {
+      case PS_DOWN:
+      case PS_START:
+      case PS_STOP:
+      case PS_UP:
+       if (proto_state != p->p.proto_state)
+         proto_notify_state(&p->p, proto_state);
+       break;
+      default:
+       RPKI_ERROR(p, "%s: we received some bullshit %d", __func__, proto_state);
+    }
+  }
+
+  return 0;
+}
+
+static int
+roa_update_bird_thread_hook(struct birdsock *sk, int size)
 {
   struct rpki_proto *p = sk->data;
   struct rpki_entry *e;
@@ -413,10 +461,10 @@ recv_data_in_main_thread(struct birdsock *sk, int size)
 }
 
 static void
-recv_err_in_main_thread(struct birdsock *sk, int err)
+pipe_error_hook(struct birdsock *sk, int err)
 {
   struct rpki_proto *p = sk->data;
-  RPKI_ERROR(p, "Notify socket error: %m", err);
+  RPKI_ERROR(p, "Notify socket error[%d] %m", err);
 }
 
 static sock *
@@ -433,8 +481,7 @@ static sock *
 create_read_pipe(struct rpki_proto *p, int fd)
 {
   sock *sk = create_pipe(p, fd);
-  sk->rx_hook = recv_data_in_main_thread;
-  sk->err_hook = recv_err_in_main_thread;
+  sk->err_hook = pipe_error_hook;
   if (sk_open(sk) < 0)
     return NULL;
   return sk;
@@ -451,7 +498,7 @@ create_write_pipe(struct rpki_proto *p, int fd)
 }
 
 static void
-create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair)
+create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair, int (*recv_callback)(struct birdsock *, int))
 {
   int pipe_fildes[2];
 
@@ -460,6 +507,8 @@ create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair)
       || ((sk_pair->write = create_write_pipe(p, pipe_fildes[1])) == NULL)
   )
     RPKI_DIE(p, "pipe: %m");
+
+  sk_pair->read->rx_hook = recv_callback;
 }
 
 static uint
@@ -542,7 +591,6 @@ create_rtrlib_socket(struct rpki_proto *p, struct rpki_cache *cache, pool *pool)
   else
     s = create_rtrlib_tcp_socket(cache, pool);
 
-  s->connection_state_fp = &rtr_thread_status_hook;
   s->connection_state_fp_param = p;
   return s;
 }
@@ -605,7 +653,13 @@ rpki_start_rtrlib_mgr(struct rpki_proto *p, struct rpki_config *cf)
 {
   struct rtr_mgr_group_crate grouped_list = group_cache_list_by_preferences(p, &cf->cache_list, p->p.pool);
 
-  p->rtr_conf = rtr_mgr_init_x(grouped_list.groups, grouped_list.groups_len, 10, 20, &rtr_thread_update_hook, NULL, &rtr_mgr_thread_status_hook, p);
+  p->rtr_conf = rtr_mgr_init_x(
+      grouped_list.groups,
+      grouped_list.groups_len,
+      10, 20,
+      &roa_update_rtrlib_thread_hook,
+      NULL,
+      &status_update_rtrlib_thread_hook, p);
 
   return rtr_mgr_start_x(p->rtr_conf);
 }
@@ -616,7 +670,9 @@ rpki_start(struct proto *P)
   struct rpki_proto *p = (struct rpki_proto *) P;
   struct rpki_config *cf = (struct rpki_config *) (P->cf);
 
-  create_pipe_pair(p, &p->roa_update);
+  create_pipe_pair(p, &p->status_update, status_update_bird_thread_hook);
+
+  create_pipe_pair(p, &p->roa_update, roa_update_bird_thread_hook);
   init_list(&p->roa_update_list);
   pthread_mutex_init(&p->roa_update_lock, NULL);
 
index 3f59f0e58a13f073de25a7c9d09f43699677ebff..b517070b8cc268c9366bb7e38dd844b6bad815ac 100644 (file)
@@ -79,6 +79,7 @@ struct rpki_proto {
   struct rpki_rw_sk_pair roa_update;
   list roa_update_list;
   pthread_mutex_t roa_update_lock;
+  struct rpki_rw_sk_pair status_update;
 };
 
 struct rpki_cache *rpki_new_cache(void);