send_data_to_main_thread(struct rpki_proto *p, struct rpki_entry *e)
{
rpki_lock_notify(p);
- add_tail(&p->notify_list, &e->n);
+ add_tail(&p->roa_update_list, &e->n);
rpki_unlock_notify(p);
- pipe_kick(p->notify_write_sk->fd);
+ pipe_kick(p->roa_update.write->fd);
}
static void
rpki_lock_notify(p);
init_list(&tmp_list);
- add_tail_list(&tmp_list, &p->notify_list);
- init_list(&p->notify_list);
+ add_tail_list(&tmp_list, &p->roa_update_list);
+ init_list(&p->roa_update_list);
rpki_unlock_notify(p);
WALK_LIST_FIRST(e, tmp_list)
}
static sock *
-create_socket(struct rpki_proto *p, int fd)
+create_pipe(struct rpki_proto *p, int fd)
{
sock *sk = sk_new(p->p.pool);
sk->type = SK_MAGIC;
return sk;
}
-static void
-create_read_socket(struct rpki_proto *p, int fd)
+static sock *
+create_read_pipe(struct rpki_proto *p, int fd)
{
- sock *sk = create_socket(p, fd);
+ sock *sk = create_pipe(p, fd);
sk->rx_hook = recv_data_in_main_thread;
sk->err_hook = recv_err_in_main_thread;
if (sk_open(sk) < 0)
- RPKI_DIE(p, "read socket sk_open() failed");
- p->notify_read_sk = sk;
+ return NULL;
+ return sk;
}
-static void
-create_write_socket(struct rpki_proto *p, int fd)
+static sock *
+create_write_pipe(struct rpki_proto *p, int fd)
{
- sock *sk = create_socket(p, fd);
+ sock *sk = create_pipe(p, fd);
sk->flags = SKF_THREAD;
if (sk_open(sk) < 0)
- RPKI_DIE(p, "write socket sk_open() failed");
- p->notify_write_sk = sk;
+ return NULL;
+ return sk;
}
static void
-create_rw_sockets(struct rpki_proto *p)
+create_pipe_pair(struct rpki_proto *p, struct rpki_rw_sk_pair *sk_pair)
{
int pipe_fildes[2];
- int rv = pipe(pipe_fildes);
- if (rv < 0)
+ if ((pipe(pipe_fildes) < 0)
+ || ((sk_pair->read = create_read_pipe(p, pipe_fildes[0])) == NULL)
+ || ((sk_pair->write = create_write_pipe(p, pipe_fildes[1])) == NULL)
+ )
RPKI_DIE(p, "pipe: %m");
-
- create_read_socket (p, pipe_fildes[0]);
- create_write_socket(p, pipe_fildes[1]);
}
static uint
struct rpki_proto *p = (struct rpki_proto *) P;
struct rpki_config *cf = (struct rpki_config *) (P->cf);
- create_rw_sockets(p);
- init_list(&p->notify_list);
- pthread_mutex_init(&p->notify_lock, NULL);
+ create_pipe_pair(p, &p->roa_update);
+ init_list(&p->roa_update_list);
+ pthread_mutex_init(&p->roa_update_lock, NULL);
lock_rpki_proto_list();
add_tail(&rpki_proto_list, &p->rpki_node);
rem2_node(&p->rpki_node);
unlock_rpki_proto_list();
- pthread_mutex_destroy(&p->notify_lock);
+ pthread_mutex_destroy(&p->roa_update_lock);
return PS_DOWN;
}
struct roa_table_config *roa_table_cf;
};
+struct rpki_rw_sk_pair {
+ sock *read;
+ sock *write;
+};
+
struct rpki_proto {
struct proto p;
struct rpki_config *cf;
struct rtr_mgr_config *rtr_conf;
- sock *notify_read_sk;
- sock *notify_write_sk;
- list notify_list;
- pthread_mutex_t notify_lock;
+ struct rpki_rw_sk_pair roa_update;
+ list roa_update_list;
+ pthread_mutex_t roa_update_lock;
};
struct rpki_cache *rpki_new_cache(void);
-static inline void rpki_lock_notify(struct rpki_proto *p) { pthread_mutex_lock(&p->notify_lock); }
-static inline void rpki_unlock_notify(struct rpki_proto *p) { pthread_mutex_unlock(&p->notify_lock); }
+static inline void rpki_lock_notify(struct rpki_proto *p) { pthread_mutex_lock(&p->roa_update_lock); }
+static inline void rpki_unlock_notify(struct rpki_proto *p) { pthread_mutex_unlock(&p->roa_update_lock); }
void rpki_init_all(void);
char *rpki_load_rtrlib(void);