]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
lib: Add ctdbd_req_send/recv
authorVolker Lendecke <vl@samba.org>
Fri, 20 Mar 2020 12:58:21 +0000 (13:58 +0100)
committerRalph Boehme <slow@samba.org>
Tue, 28 Apr 2020 09:08:40 +0000 (09:08 +0000)
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
source3/include/ctdbd_conn.h
source3/lib/ctdbd_conn.c

index 7ae2ec40139409f11937d051b4ee49d28cf30a59..161d8608b1c11818a99f9e934b08967cbbb2ef51 100644 (file)
@@ -108,6 +108,21 @@ struct ctdb_req_header;
 void ctdbd_prep_hdr_next_reqid(
        struct ctdbd_connection *conn, struct ctdb_req_header *hdr);
 
+/*
+ * Async ctdb_request. iov[0] must start with an initialized
+ * struct ctdb_req_header
+ */
+struct tevent_req *ctdbd_req_send(
+       TALLOC_CTX *mem_ctx,
+       struct tevent_context *ev,
+       struct ctdbd_connection *conn,
+       struct iovec *iov,
+       size_t num_iov);
+int ctdbd_req_recv(
+       struct tevent_req *req,
+       TALLOC_CTX *mem_ctx,
+       struct ctdb_req_header **reply);
+
 struct tevent_req *ctdbd_parse_send(TALLOC_CTX *mem_ctx,
                                    struct tevent_context *ev,
                                    struct ctdbd_connection *conn,
index 20f47aae17364eb315a4755e80f346a2d223abeb..c6555914ff6e47ff80c1d7aca9942b8da567f64c 100644 (file)
@@ -36,6 +36,7 @@
 #include "lib/util/sys_rw.h"
 #include "lib/util/blocking.h"
 #include "ctdb/include/ctdb_protocol.h"
+#include "lib/async_req/async_sock.h"
 
 /* paths to these include files come from --with-ctdb= in configure */
 
@@ -85,6 +86,8 @@ struct ctdbd_connection {
         * Outgoing queue for writev_send of asynchronous ctdb requests
         */
        struct tevent_queue *outgoing;
+       struct tevent_req **pending;
+       struct tevent_req *read_req;
 };
 
 static void ctdbd_async_socket_handler(struct tevent_context *ev,
@@ -99,7 +102,8 @@ static bool ctdbd_conn_has_async_sends(struct ctdbd_connection *conn)
 
 static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
 {
-       return (conn->fde != NULL);
+       size_t len = talloc_array_length(conn->pending);
+       return ((len != 0) || (conn->fde != NULL));
 }
 
 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
@@ -1850,6 +1854,328 @@ void ctdbd_prep_hdr_next_reqid(
        };
 }
 
+struct ctdbd_pkt_read_state {
+       uint8_t *pkt;
+};
+
+static ssize_t ctdbd_pkt_read_more(
+       uint8_t *buf, size_t buflen, void *private_data);
+static void ctdbd_pkt_read_done(struct tevent_req *subreq);
+
+static struct tevent_req *ctdbd_pkt_read_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
+{
+       struct tevent_req *req = NULL, *subreq = NULL;
+       struct ctdbd_pkt_read_state *state = NULL;
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
+       return req;
+}
+
+static ssize_t ctdbd_pkt_read_more(
+       uint8_t *buf, size_t buflen, void *private_data)
+{
+       uint32_t msglen;
+       if (buflen < 4) {
+               return -1;
+       }
+       if (buflen > 4) {
+               return 0;       /* Been here, done */
+       }
+       memcpy(&msglen, buf, 4);
+
+       if (msglen < sizeof(struct ctdb_req_header)) {
+               return -1;
+       }
+       return msglen - sizeof(msglen);
+}
+
+static void ctdbd_pkt_read_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdbd_pkt_read_state *state = tevent_req_data(
+               req, struct ctdbd_pkt_read_state);
+       ssize_t nread;
+       int err;
+
+       nread = read_packet_recv(subreq, state, &state->pkt, &err);
+       TALLOC_FREE(subreq);
+       if (nread == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+       tevent_req_done(req);
+}
+
+static int ctdbd_pkt_read_recv(
+       struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
+{
+       struct ctdbd_pkt_read_state *state = tevent_req_data(
+               req, struct ctdbd_pkt_read_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       *pkt = talloc_move(mem_ctx, &state->pkt);
+       tevent_req_received(req);
+       return 0;
+}
+
+static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
+static void ctdbd_conn_received(struct tevent_req *subreq);
+
+struct ctdbd_req_state {
+       struct ctdbd_connection *conn;
+       struct tevent_context *ev;
+       uint32_t reqid;
+       struct ctdb_req_header *reply;
+};
+
+static void ctdbd_req_unset_pending(struct tevent_req *req)
+{
+       struct ctdbd_req_state *state = tevent_req_data(
+               req, struct ctdbd_req_state);
+       struct ctdbd_connection *conn = state->conn;
+       size_t num_pending = talloc_array_length(conn->pending);
+       size_t i, num_after;
+
+       tevent_req_set_cleanup_fn(req, NULL);
+
+       if (num_pending == 1) {
+               /*
+                * conn->read_req is a child of conn->pending
+                */
+               TALLOC_FREE(conn->pending);
+               conn->read_req = NULL;
+               return;
+       }
+
+       for (i=0; i<num_pending; i++) {
+               if (req == conn->pending[i]) {
+                       break;
+               }
+       }
+       if (i == num_pending) {
+               /*
+                * Something's seriously broken. Just returning here is the
+                * right thing nevertheless, the point of this routine is to
+                * remove ourselves from conn->pending.
+                */
+               return;
+       }
+
+       num_after = num_pending - i - 1;
+       if (num_after > 0) {
+               memmove(&conn->pending[i],
+                       &conn->pending[i] + 1,
+                       sizeof(*conn->pending) * num_after);
+       }
+       conn->pending = talloc_realloc(
+               NULL, conn->pending, struct tevent_req *, num_pending - 1);
+}
+
+static void ctdbd_req_cleanup(
+       struct tevent_req *req, enum tevent_req_state req_state)
+{
+       ctdbd_req_unset_pending(req);
+}
+
+static bool ctdbd_req_set_pending(struct tevent_req *req)
+{
+       struct ctdbd_req_state *state = tevent_req_data(
+               req, struct ctdbd_req_state);
+       struct ctdbd_connection *conn = state->conn;
+       struct tevent_req **pending = NULL;
+       size_t num_pending = talloc_array_length(conn->pending);
+       bool ok;
+
+       pending = talloc_realloc(
+               conn, conn->pending, struct tevent_req *, num_pending + 1);
+       if (pending == NULL) {
+               return false;
+       }
+       pending[num_pending] = req;
+       conn->pending = pending;
+
+       tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
+
+       ok = ctdbd_conn_receive_next(conn);
+       if (!ok) {
+               ctdbd_req_unset_pending(req);
+               return false;
+       }
+
+       return true;
+}
+
+static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
+{
+       size_t num_pending = talloc_array_length(conn->pending);
+       struct tevent_req *req = NULL;
+       struct ctdbd_req_state *state = NULL;
+
+       if (conn->read_req != NULL) {
+               return true;
+       }
+       if (num_pending == 0) {
+               /*
+                * done for now
+                */
+               return true;
+       }
+
+       req = conn->pending[0];
+       state = tevent_req_data(req, struct ctdbd_req_state);
+
+       conn->read_req = ctdbd_pkt_read_send(
+               conn->pending, state->ev, conn->fd);
+       if (conn->read_req == NULL) {
+               return false;
+       }
+       tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
+       return true;
+}
+
+static void ctdbd_conn_received(struct tevent_req *subreq)
+{
+       struct ctdbd_connection *conn = tevent_req_callback_data(
+               subreq, struct ctdbd_connection);
+       TALLOC_CTX *frame = talloc_stackframe();
+       uint8_t *pkt = NULL;
+       int ret;
+       struct ctdb_req_header *hdr = NULL;
+       uint32_t reqid;
+       struct tevent_req *req = NULL;
+       struct ctdbd_req_state *state = NULL;
+       size_t i, num_pending;
+       bool ok;
+
+       SMB_ASSERT(subreq == conn->read_req);
+       conn->read_req = NULL;
+
+       ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
+       TALLOC_FREE(subreq);
+       if (ret != 0) {
+               cluster_fatal("ctdbd_pkt_read failed\n");
+       }
+
+       hdr = (struct ctdb_req_header *)pkt;
+       reqid = hdr->reqid;
+       num_pending = talloc_array_length(conn->pending);
+
+       for (i=0; i<num_pending; i++) {
+               req = conn->pending[i];
+               state = tevent_req_data(req, struct ctdbd_req_state);
+               if (state->reqid == reqid) {
+                       break;
+               }
+       }
+
+       if (i == num_pending) {
+               /* not found */
+               TALLOC_FREE(frame);
+               return;
+       }
+
+       state->reply = talloc_move(state, &hdr);
+       tevent_req_defer_callback(req, state->ev);
+       tevent_req_done(req);
+
+       TALLOC_FREE(frame);
+
+       ok = ctdbd_conn_receive_next(conn);
+       if (!ok) {
+               cluster_fatal("ctdbd_conn_receive_next failed\n");
+       }
+}
+
+static void ctdbd_req_written(struct tevent_req *subreq);
+
+struct tevent_req *ctdbd_req_send(
+       TALLOC_CTX *mem_ctx,
+       struct tevent_context *ev,
+       struct ctdbd_connection *conn,
+       struct iovec *iov,
+       size_t num_iov)
+{
+       struct tevent_req *req = NULL, *subreq = NULL;
+       struct ctdbd_req_state *state = NULL;
+       struct ctdb_req_header *hdr = NULL;
+       bool ok;
+
+       req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->conn = conn;
+       state->ev = ev;
+
+       if ((num_iov == 0) ||
+           (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
+               tevent_req_error(req, EINVAL);
+               return tevent_req_post(req, ev);
+       }
+       hdr = iov[0].iov_base;
+       state->reqid = hdr->reqid;
+
+       ok = ctdbd_req_set_pending(req);
+       if (!ok) {
+               tevent_req_oom(req);
+               return tevent_req_post(req, ev);
+       }
+
+       subreq = writev_send(
+               state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdbd_req_written, req);
+
+       return req;
+}
+
+static void ctdbd_req_written(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       ssize_t nwritten;
+       int err;
+
+       nwritten = writev_recv(subreq, &err);
+       TALLOC_FREE(subreq);
+       if (nwritten == -1) {
+               tevent_req_error(req, err);
+               return;
+       }
+}
+
+int ctdbd_req_recv(
+       struct tevent_req *req,
+       TALLOC_CTX *mem_ctx,
+       struct ctdb_req_header **reply)
+{
+       struct ctdbd_req_state *state = tevent_req_data(
+               req, struct ctdbd_req_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
+       }
+       *reply = talloc_move(mem_ctx, &state->reply);
+       tevent_req_received(req);
+       return 0;
+}
+
 struct ctdbd_parse_state {
        struct tevent_context *ev;
        struct ctdbd_connection *conn;