]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
ctdbd_conn: Move message handling out of ctdbd_conn.c
authorVolker Lendecke <vl@samba.org>
Tue, 19 May 2015 14:55:32 +0000 (16:55 +0200)
committerVolker Lendecke <vl@samba.org>
Thu, 28 May 2015 09:13:09 +0000 (11:13 +0200)
This also removes the deferred message handling. It's no longer required,
because the messaging_send_iov_from always goes through the kernel which
takes at least one round through tevent.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
source3/lib/ctdbd_conn.c
source3/lib/messages_ctdbd.c

index a0dbd32434f6d66f4ef1706f19f23d950dc8606c..4e030184b22c7e1b0f2a44bf3571c3e20efd3590 100644 (file)
@@ -130,20 +130,6 @@ NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
        return NT_STATUS_OK;
 }
 
-static bool ctdb_is_our_srvid(struct ctdbd_connection *conn, uint64_t srvid)
-{
-       size_t i, num_callbacks;
-
-       num_callbacks = talloc_array_length(conn->callbacks);
-
-       for (i=0; i<num_callbacks; i++) {
-               if (srvid == conn->callbacks[i].srvid) {
-                       return true;
-               }
-       }
-       return false;
-}
-
 static void ctdbd_msg_call_back(struct ctdbd_connection *conn,
                                struct ctdb_req_message *msg)
 {
@@ -292,78 +278,6 @@ static int ctdbd_connect(int *pfd)
        return 0;
 }
 
-/*
- * State necessary to defer an incoming message while we are waiting for a
- * ctdb reply.
- */
-
-struct deferred_msg_state {
-       struct messaging_context *msg_ctx;
-       struct messaging_rec *rec;
-};
-
-/*
- * Timed event handler for the deferred message
- */
-
-static void deferred_message_dispatch(struct tevent_context *event_ctx,
-                                     struct tevent_timer *te,
-                                     struct timeval now,
-                                     void *private_data)
-{
-       struct deferred_msg_state *state = talloc_get_type_abort(
-               private_data, struct deferred_msg_state);
-
-       messaging_dispatch_rec(state->msg_ctx, state->rec);
-       TALLOC_FREE(state);
-       TALLOC_FREE(te);
-}
-
-/*
- * Fetch a messaging_rec from an incoming ctdb style message
- */
-
-static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
-                                                    size_t overall_length,
-                                                    struct ctdb_req_message *msg)
-{
-       struct messaging_rec *result;
-       DATA_BLOB blob;
-       enum ndr_err_code ndr_err;
-
-       if ((overall_length < offsetof(struct ctdb_req_message, data))
-           || (overall_length
-               < offsetof(struct ctdb_req_message, data) + msg->datalen)) {
-
-               cluster_fatal("got invalid msg length");
-       }
-
-       if (!(result = talloc(mem_ctx, struct messaging_rec))) {
-               DEBUG(0, ("talloc failed\n"));
-               return NULL;
-       }
-
-       blob = data_blob_const(msg->data, msg->datalen);
-
-       ndr_err = ndr_pull_struct_blob(
-               &blob, result, result,
-               (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
-
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
-                         ndr_errstr(ndr_err)));
-               TALLOC_FREE(result);
-               return NULL;
-       }
-
-       if (DEBUGLEVEL >= 11) {
-               DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
-               NDR_PRINT_DEBUG(messaging_rec, result);
-       }
-
-       return result;
-}
-
 static NTSTATUS ctdb_read_packet(int fd, TALLOC_CTX *mem_ctx,
                                 struct ctdb_req_header **result)
 {
@@ -447,8 +361,6 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
        ctdb_packet_dump(hdr);
 
        if (hdr->operation == CTDB_REQ_MESSAGE) {
-               struct tevent_timer *evt;
-               struct deferred_msg_state *msg_state;
                struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
 
                if (conn->msg_ctx == NULL) {
@@ -497,42 +409,7 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
                }
 
                ctdbd_msg_call_back(conn, msg);
-
-               msg_state = talloc(NULL, struct deferred_msg_state);
-               if (msg_state == NULL) {
-                       DEBUG(0, ("talloc failed\n"));
-                       TALLOC_FREE(hdr);
-                       goto next_pkt;
-               }
-
-               if (!(msg_state->rec = ctdb_pull_messaging_rec(
-                             msg_state, msg->hdr.length, msg))) {
-                       DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
-                       TALLOC_FREE(msg_state);
-                       TALLOC_FREE(hdr);
-                       goto next_pkt;
-               }
-
                TALLOC_FREE(hdr);
-
-               msg_state->msg_ctx = conn->msg_ctx;
-
-               /*
-                * We're waiting for a call reply, but an async message has
-                * crossed. Defer dispatching to the toplevel event loop.
-                */
-               evt = tevent_add_timer(messaging_tevent_context(conn->msg_ctx),
-                                     messaging_tevent_context(conn->msg_ctx),
-                                     timeval_zero(),
-                                     deferred_message_dispatch,
-                                     msg_state);
-               if (evt == NULL) {
-                       DEBUG(0, ("event_add_timed failed\n"));
-                       TALLOC_FREE(msg_state);
-                       TALLOC_FREE(hdr);
-                       goto next_pkt;
-               }
-
                goto next_pkt;
        }
 
@@ -626,11 +503,6 @@ NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
                return status;
        }
 
-       status = register_with_ctdbd(conn, (uint64_t)getpid(), NULL, NULL);
-       if (!NT_STATUS_IS_OK(status)) {
-               goto fail;
-       }
-
        status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL);
        if (!NT_STATUS_IS_OK(status)) {
                goto fail;
@@ -668,7 +540,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
                                    struct ctdb_req_header *hdr)
 {
        struct ctdb_req_message *msg;
-       struct messaging_rec *msg_rec;
 
        if (hdr->operation != CTDB_REQ_MESSAGE) {
                DEBUG(0, ("Received async msg of type %u, discarding\n",
@@ -717,18 +588,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
 
        ctdbd_msg_call_back(conn, msg);
 
-       if (!ctdb_is_our_srvid(conn, msg->srvid)) {
-               DEBUG(0,("Got unexpected message with srvid=%llu\n",
-                        (unsigned long long)msg->srvid));
-               return NT_STATUS_OK;
-       }
-
-       msg_rec = ctdb_pull_messaging_rec(talloc_tos(), msg->hdr.length, msg);
-       if (msg_rec == NULL) {
-               DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
-               return NT_STATUS_NO_MEMORY;
-       }
-       messaging_dispatch_rec(conn->msg_ctx, msg_rec);
        return NT_STATUS_OK;
 }
 
index 799780e6a141096937af3f58adfbe604e95787db..430dc51dbffb606da3234a57f9803ec5acb337a4 100644 (file)
@@ -128,6 +128,88 @@ static int messaging_ctdbd_destructor(struct messaging_ctdbd_context *ctx)
        return 0;
 }
 
+static struct messaging_rec *ctdb_pull_messaging_rec(
+       TALLOC_CTX *mem_ctx, const struct ctdb_req_message *msg)
+{
+       struct messaging_rec *result;
+       DATA_BLOB blob;
+       enum ndr_err_code ndr_err;
+       size_t len = msg->hdr.length;
+
+       if (len < offsetof(struct ctdb_req_message, data)) {
+               return NULL;
+       }
+       len -= offsetof(struct ctdb_req_message, data);
+
+       if (len < msg->datalen) {
+               return NULL;
+       }
+
+       result = talloc(mem_ctx, struct messaging_rec);
+       if (result == NULL) {
+               return NULL;
+       }
+
+       blob = data_blob_const(msg->data, msg->datalen);
+
+       ndr_err = ndr_pull_struct_blob_all(
+               &blob, result, result,
+               (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
+
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
+                         ndr_errstr(ndr_err)));
+               TALLOC_FREE(result);
+               return NULL;
+       }
+
+       if (DEBUGLEVEL >= 11) {
+               DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
+               NDR_PRINT_DEBUG(messaging_rec, result);
+       }
+
+       return result;
+}
+
+static void messaging_ctdb_recv(struct ctdb_req_message *msg,
+                               void *private_data)
+{
+       struct messaging_context *msg_ctx = talloc_get_type_abort(
+               private_data, struct messaging_context);
+       struct server_id me = messaging_server_id(msg_ctx);
+       struct messaging_rec *rec;
+       NTSTATUS status;
+       struct iovec iov;
+
+       rec = ctdb_pull_messaging_rec(msg_ctx, msg);
+       if (rec == NULL) {
+               DEBUG(10, ("%s: ctdb_pull_messaging_rec failed\n", __func__));
+               return;
+       }
+
+       if (!server_id_same_process(&me, &rec->dest)) {
+               struct server_id_buf id1, id2;
+
+               DEBUG(10, ("%s: I'm %s, ignoring msg to %s\n", __func__,
+                          server_id_str_buf(me, &id1),
+                          server_id_str_buf(rec->dest, &id2)));
+               TALLOC_FREE(rec);
+               return;
+       }
+
+       iov = (struct iovec) { .iov_base = rec->buf.data,
+                              .iov_len = rec->buf.length };
+
+       status = messaging_send_iov_from(msg_ctx, rec->src, rec->dest,
+                                        rec->msg_type, &iov, 1, NULL, 0);
+       TALLOC_FREE(rec);
+
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("%s: messaging_send_iov_from failed: %s\n",
+                          __func__, nt_errstr(status)));
+       }
+}
+
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
                              TALLOC_CTX *mem_ctx,
                              struct messaging_backend **presult)
@@ -165,6 +247,9 @@ NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
                return status;
        }
 
+       status = register_with_ctdbd(ctx->conn, getpid(),
+                                    messaging_ctdb_recv, msg_ctx);
+
        global_ctdb_connection_pid = getpid();
        global_ctdbd_connection = ctx->conn;
        talloc_set_destructor(ctx, messaging_ctdbd_destructor);