From 313dbecb76b3f0149d99ec628d5eb5e25d0a9ee5 Mon Sep 17 00:00:00 2001 From: rbu9fe <106606487+rbu9fe@users.noreply.github.com> Date: Sat, 7 Dec 2024 11:36:09 +0100 Subject: [PATCH] feat: Add lldpctl_watch_sync_unblock (#696) * feat: Add lldpctl_watch_sync_unblock * update: Replace select by epoll * refactor: Replace epoll by poll to support non-Linux OS * update: Let lldpcli terminate gracefully --- src/client/lldpcli.c | 21 ++++++++++++++ src/client/show.c | 6 ++-- src/lib/atom.c | 17 ++++++++++++ src/lib/atom.h | 2 ++ src/lib/connection.c | 66 +++++++++++++++++++++++++++++++++----------- src/lib/errors.c | 2 ++ src/lib/lldpctl.h | 16 ++++++++++- src/lib/lldpctl.map | 1 + 8 files changed, 112 insertions(+), 19 deletions(-) diff --git a/src/client/lldpcli.c b/src/client/lldpcli.c index 8999ea78..4286e024 100644 --- a/src/client/lldpcli.c +++ b/src/client/lldpcli.c @@ -43,6 +43,11 @@ extern const char *__progname; static struct cmd_node *root = NULL; const char *ctlname = NULL; +/* Global context required for graceful termination */ +static struct { + lldpctl_conn_t **conn; +} shutdown_context; + static int is_lldpctl(const char *name) { @@ -422,6 +427,15 @@ input_append(const char *arg, struct inputs *inputs, int acceptdir, int warn) free(namelist); } +static void +term_handler(int signum) +{ + lldpctl_conn_t *conn = *shutdown_context.conn; + if (conn) { + lldpctl_watch_sync_unblock(conn); + } +} + int main(int argc, char *argv[]) { @@ -430,6 +444,8 @@ main(int argc, char *argv[]) lldpctl_conn_t *conn = NULL; const char *options = is_lldpctl(argv[0]) ? "hdvf:u:" : "hdsvf:c:C:u:"; lldpctl_atom_t *configuration; + struct sigaction sa = {}; + shutdown_context.conn = &conn; int gotinputs = 0, version = 0; struct inputs inputs; @@ -494,6 +510,11 @@ main(int argc, char *argv[]) /* Disable SIGPIPE */ signal(SIGPIPE, SIG_IGN); + /* Install termination handler */ + sa.sa_handler = term_handler; + if (sigaction(SIGINT, &sa, NULL) < 0) goto end; + if (sigaction(SIGTERM, &sa, NULL) < 0) goto end; + /* Register commands */ root = register_commands(); diff --git a/src/client/show.c b/src/client/show.c index f13ae573..fff6fc89 100644 --- a/src/client/show.c +++ b/src/client/show.c @@ -238,8 +238,10 @@ cmd_watch_neighbors(struct lldpctl_conn_t *conn, struct writer *w, struct cmd_en } while (1) { if (lldpctl_watch(conn) < 0) { - log_warnx("lldpctl", "unable to watch for neighbors. %s", - lldpctl_last_strerror(conn)); + if (lldpctl_last_error(conn) != LLDPCTL_ERR_CALLBACK_UNBLOCK) { + log_warnx("lldpctl", "unable to watch for neighbors. %s", + lldpctl_last_strerror(conn)); + } return 0; } if (limit > 0 && wa.nb >= limit) return 1; diff --git a/src/lib/atom.c b/src/lib/atom.c index 8842ee06..f5a851c7 100644 --- a/src/lib/atom.c +++ b/src/lib/atom.c @@ -401,6 +401,23 @@ lldpctl_watch(lldpctl_conn_t *conn) return 0; } +void +lldpctl_watch_sync_unblock(lldpctl_conn_t *conn) +{ + if (conn->state != CONN_STATE_WATCHING) + return; + + if (!conn->sync_clb) + return; + + struct lldpctl_conn_sync_t *data = conn->user_data; + + if (data->pipe_fd[1] != -1) { + /* Write to the pipe to unblock the read */ + write(data->pipe_fd[1], "x", 1); + } +} + lldpctl_atom_t * lldpctl_get_configuration(lldpctl_conn_t *conn) { diff --git a/src/lib/atom.h b/src/lib/atom.h index 0dfc85b5..c75615c6 100644 --- a/src/lib/atom.h +++ b/src/lib/atom.h @@ -30,6 +30,7 @@ struct lldpctl_conn_t { lldpctl_recv_callback recv; /* Receive callback */ lldpctl_send_callback send; /* Send callback */ void *user_data; /* Callback user data */ + uint8_t sync_clb; /* If set synchronous callbacks are used */ /* IO state handling. */ uint8_t *input_buffer; /* Current input/output buffer */ @@ -76,6 +77,7 @@ struct lldpctl_conn_t { /* User data for synchronous callbacks. */ struct lldpctl_conn_sync_t { int fd; /* File descriptor to the socket. */ + int pipe_fd[2]; /* Pipe file descriptors required for unblocking a read-blocked watcher. */ }; ssize_t _lldpctl_needs(lldpctl_conn_t *lldpctl, size_t length); diff --git a/src/lib/connection.c b/src/lib/connection.c index 43f46cb0..2c0c0cd9 100644 --- a/src/lib/connection.c +++ b/src/lib/connection.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "lldpctl.h" #include "atom.h" @@ -35,7 +36,7 @@ lldpctl_get_default_transport(void) /* Connect to the remote end */ static int -sync_connect(lldpctl_conn_t *lldpctl) +sync_connect(lldpctl_conn_t *lldpctl, struct lldpctl_conn_sync_t *conn) { return ctl_connect(lldpctl->ctlname); } @@ -47,7 +48,7 @@ sync_send(lldpctl_conn_t *lldpctl, const uint8_t *data, size_t length, void *use struct lldpctl_conn_sync_t *conn = user_data; ssize_t nb; - if (conn->fd == -1 && ((conn->fd = sync_connect(lldpctl)) == -1)) { + if (conn->fd == -1 && ((conn->fd = sync_connect(lldpctl, conn)) == -1)) { return LLDPCTL_ERR_CANNOT_CONNECT; } @@ -66,20 +67,38 @@ sync_recv(lldpctl_conn_t *lldpctl, const uint8_t *data, size_t length, void *use ssize_t nb; size_t remain, offset = 0; - if (conn->fd == -1 && ((conn->fd = sync_connect(lldpctl)) == -1)) { + if (conn->fd == -1 && ((conn->fd = sync_connect(lldpctl, conn)) == -1)) { lldpctl->error = LLDPCTL_ERR_CANNOT_CONNECT; return LLDPCTL_ERR_CANNOT_CONNECT; } + struct pollfd fds[2]; + fds[0].fd = conn->pipe_fd[0]; + fds[0].events = POLLIN; + fds[1].fd = conn->fd; + fds[1].events = POLLIN; + remain = length; do { - if ((nb = read(conn->fd, (unsigned char *)data + offset, remain)) == - -1) { - if (errno == EAGAIN || errno == EINTR) continue; + if (poll(fds, sizeof(fds)/sizeof(fds[0]), -1) == -1) { + if (errno == EINTR) continue; return LLDPCTL_ERR_CALLBACK_FAILURE; } - remain -= nb; - offset += nb; + + if (fds[0].revents & POLLIN) { + /* Unblock request received. */ + return LLDPCTL_ERR_CALLBACK_UNBLOCK; + } + + if (fds[1].revents & POLLIN) { + /* Message from daemon. */ + if ((nb = read(conn->fd, (unsigned char *)data + offset, remain)) == -1) { + if (errno == EAGAIN || errno == EINTR) continue; + return LLDPCTL_ERR_CALLBACK_FAILURE; + } + remain -= nb; + offset += nb; + } } while (remain > 0 && nb != 0); return offset; } @@ -94,6 +113,7 @@ lldpctl_conn_t * lldpctl_new_name(const char *ctlname, lldpctl_send_callback send, lldpctl_recv_callback recv, void *user_data) { + int rc = LLDPCTL_ERR_FATAL; lldpctl_conn_t *conn = NULL; struct lldpctl_conn_sync_t *data = NULL; @@ -105,23 +125,35 @@ lldpctl_new_name(const char *ctlname, lldpctl_send_callback send, conn->ctlname = strdup(ctlname); if (conn->ctlname == NULL) { - free(conn); - return NULL; + goto end; } if (!send && !recv) { - if ((data = malloc(sizeof(struct lldpctl_conn_sync_t))) == NULL) { - free(conn->ctlname); - free(conn); - return NULL; - } + if ((data = malloc(sizeof(struct lldpctl_conn_sync_t))) == NULL) goto end; + if (pipe(data->pipe_fd) == -1) goto end; data->fd = -1; conn->send = sync_send; conn->recv = sync_recv; conn->user_data = data; + conn->sync_clb = 1; } else { conn->send = send; conn->recv = recv; conn->user_data = user_data; + conn->sync_clb = 0; + } + + rc = LLDPCTL_NO_ERROR; + +end: + + if (rc != LLDPCTL_NO_ERROR) { + + if (data) { + free(data); + } + if (conn->ctlname) free(conn->ctlname); + free(conn); + conn = NULL; } return conn; @@ -134,8 +166,10 @@ lldpctl_release(lldpctl_conn_t *conn) free(conn->ctlname); if (conn->send == sync_send) { struct lldpctl_conn_sync_t *data = conn->user_data; + close(data->pipe_fd[0]); + close(data->pipe_fd[1]); if (data->fd != -1) close(data->fd); - free(conn->user_data); + free(data); } free(conn->input_buffer); free(conn->output_buffer); diff --git a/src/lib/errors.c b/src/lib/errors.c index 899ca6fe..28886ed5 100644 --- a/src/lib/errors.c +++ b/src/lib/errors.c @@ -52,6 +52,8 @@ lldpctl_strerror(lldpctl_error_t error) return "Not enough memory available"; case LLDPCTL_ERR_CALLBACK_FAILURE: return "A failure occurred during callback processing"; + case LLDPCTL_ERR_CALLBACK_UNBLOCK: + return "Forced callback to unblock"; } return "Unknown error code"; } diff --git a/src/lib/lldpctl.h b/src/lib/lldpctl.h index 5d2668ff..364c760a 100644 --- a/src/lib/lldpctl.h +++ b/src/lib/lldpctl.h @@ -330,7 +330,11 @@ typedef enum { /** * An error occurred in a user provided callback. */ - LLDPCTL_ERR_CALLBACK_FAILURE = -902 + LLDPCTL_ERR_CALLBACK_FAILURE = -902, + /** + * The callback was forced to unblock. + */ + LLDPCTL_ERR_CALLBACK_UNBLOCK = -903 } lldpctl_error_t; /** @@ -546,6 +550,16 @@ int lldpctl_watch_callback2(lldpctl_conn_t *conn, lldpctl_change_callback2 cb, */ int lldpctl_watch(lldpctl_conn_t *conn); +/** + * Unblock another thread that's waiting for the next change on that synchronous callback connection. + * + * @param conn Synchronous callback connection with lldpd. + * + * If some thread is blocked at @ref lldpctl_watch() with the same synchronous callback @p conn, + * then this function will unblock it. Otherwise, this function will have no effect. + */ +void lldpctl_watch_sync_unblock(lldpctl_conn_t *conn); + /** * @defgroup liblldpctl_atom_get_special Retrieving atoms from lldpd * diff --git a/src/lib/lldpctl.map b/src/lib/lldpctl.map index c602b641..4c0f202d 100644 --- a/src/lib/lldpctl.map +++ b/src/lib/lldpctl.map @@ -47,6 +47,7 @@ LIBLLDPCTL_4.6 { lldpctl_strerror; lldpctl_watch; lldpctl_watch_callback; + lldpctl_watch_sync_unblock; local: *; -- 2.39.5