]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
first stage of efficient non-blocking ctdb traverse
authorAndrew Tridgell <tridge@samba.org>
Thu, 3 May 2007 02:16:03 +0000 (12:16 +1000)
committerAndrew Tridgell <tridge@samba.org>
Thu, 3 May 2007 02:16:03 +0000 (12:16 +1000)
(This used to be ctdb commit 4c23e6f26bde421bb56b55de9d6cd3e319b2be40)

ctdb/Makefile.in
ctdb/common/ctdb_io.c
ctdb/common/ctdb_traverse.c [new file with mode: 0644]
ctdb/include/ctdb_private.h
ctdb/tools/ctdb_control.c

index 1fc3ea9a4055c8ad5386d2db9435461e66088ccb..154dc9abe8e2934c34ed923543e7d23897e3dec9 100644 (file)
@@ -32,7 +32,7 @@ CTDB_COMMON_OBJ = common/ctdb.o common/ctdb_daemon.o common/ctdb_client.o \
        common/ctdb_io.o common/util.o common/ctdb_util.o \
        common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_lockwait.o \
        common/ctdb_message.o common/cmdline.o common/ctdb_control.o \
-       lib/util/debug.o common/ctdb_recover.o
+       lib/util/debug.o common/ctdb_recover.o common/ctdb_traverse.o
 
 CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o
 
index 6a5aa928b04f98dda8fce57c9f658bbcfda79ebf..d09339561ae7a1b3bbf91f47ea62415e1a01bc92 100644 (file)
@@ -214,9 +214,13 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
        struct ctdb_queue_pkt *pkt;
        uint32_t length2;
 
-       /* enforce the length and alignment rules from the tcp packet allocator */
-       length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
-       *(uint32_t *)data = length2;
+       if (queue->alignment) {
+               /* enforce the length and alignment rules from the tcp packet allocator */
+               length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
+               *(uint32_t *)data = length2;
+       } else {
+               length2 = length;
+       }
 
        if (length2 != length) {
                memset(data+length, 0, length2-length);
diff --git a/ctdb/common/ctdb_traverse.c b/ctdb/common/ctdb_traverse.c
new file mode 100644 (file)
index 0000000..7ecedc8
--- /dev/null
@@ -0,0 +1,178 @@
+/* 
+   efficient async ctdb traverse
+
+   Copyright (C) Andrew Tridgell  2007
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "../include/ctdb_private.h"
+
+typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
+
+/*
+  structure used to pass the data between the child and parent
+ */
+struct ctdb_traverse_data {
+       uint32_t length;
+       uint32_t keylen;
+       uint32_t datalen;
+       uint8_t  data[1];
+};
+                                  
+/*
+  handle returned to caller - freeing this handler will kill the child and 
+  terminate the traverse
+ */
+struct ctdb_traverse_handle {
+       struct ctdb_db_context *ctdb_db;
+       int fd[2];
+       pid_t child;
+       void *private_data;
+       ctdb_traverse_fn_t callback;
+       struct timeval start_time;
+       struct ctdb_queue *queue;
+};
+
+/*
+  called when data is available from the child
+ */
+static void ctdb_traverse_handler(uint8_t *rawdata, size_t length, void *private_data)
+{
+       struct ctdb_traverse_handle *h = talloc_get_type(private_data, 
+                                                   struct ctdb_traverse_handle);
+       TDB_DATA key, data;
+       ctdb_traverse_fn_t callback = h->callback;
+       void *p = h->private_data;
+       struct ctdb_traverse_data *tdata = (struct ctdb_traverse_data *)rawdata;
+
+       if (rawdata == NULL || length < 4 || length != tdata->length) {
+               /* end of traverse */
+               talloc_free(h);
+               callback(p, tdb_null, tdb_null);
+               return;
+       }
+
+       key.dsize = tdata->keylen;
+       key.dptr  = &tdata->data[0];
+       data.dsize = tdata->datalen;
+       data.dptr = &tdata->data[tdata->keylen];
+
+       callback(p, key, data); 
+}
+
+/*
+  destroy a in-flight traverse operation
+ */
+static int traverse_destructor(struct ctdb_traverse_handle *h)
+{
+       close(h->fd[0]);
+       kill(h->child, SIGKILL);
+       waitpid(h->child, NULL, 0);
+       return 0;
+}
+
+/*
+  callback from tdb_traverse_read()x
+ */
+static int ctdb_traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+       struct ctdb_traverse_handle *h = talloc_get_type(p, struct ctdb_traverse_handle);
+       struct ctdb_traverse_data *d;
+       size_t length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize;
+       d = (struct ctdb_traverse_data *)talloc_size(h, length);
+       if (d == NULL) {
+               /* error handling is tricky in this child code .... */
+               return -1;
+       }
+       d->length = length;
+       d->keylen = key.dsize;
+       d->datalen = data.dsize;
+       memcpy(&d->data[0], key.dptr, key.dsize);
+       memcpy(&d->data[key.dsize], data.dptr, data.dsize);
+       if (ctdb_queue_send(h->queue, (uint8_t *)d, d->length) != 0) {
+               return -1;
+       }
+       return 0;
+}
+
+/*
+  setup a non-blocking traverse of a tdb. The callback function will
+  be called on every record in the local ltdb. To stop the travserse,
+  talloc_free() the travserse_handle.
+ */
+struct ctdb_traverse_handle *ctdb_traverse(struct ctdb_db_context *ctdb_db,
+                                           ctdb_traverse_fn_t callback,
+                                           void *private_data)
+{
+       struct ctdb_traverse_handle *h;
+       int ret;
+
+       ctdb_db->ctdb->status.traverse_calls++;
+
+       if (!(h = talloc_zero(ctdb_db, struct ctdb_traverse_handle))) {
+               return NULL;
+       }
+
+       ret = pipe(h->fd);
+
+       if (ret != 0) {
+               talloc_free(h);
+               return NULL;
+       }
+
+       h->child = fork();
+
+       if (h->child == (pid_t)-1) {
+               close(h->fd[0]);
+               close(h->fd[1]);
+               talloc_free(h);
+               return NULL;
+       }
+
+       h->callback = callback;
+       h->private_data = private_data;
+       h->ctdb_db = ctdb_db;
+
+       if (h->child == 0) {
+               /* start the traverse in the child */
+               close(h->fd[0]);
+               tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_fn, h);
+               _exit(0);
+       }
+
+       close(h->fd[1]);
+       talloc_set_destructor(h, traverse_destructor);
+
+       /*
+         setup a packet queue between the child and the parent. This
+         copes with all the async and packet boundary issues
+        */
+       h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_handler, h);
+       if (h->queue == NULL) {
+               talloc_free(h);
+               return NULL;
+       }
+
+       h->start_time = timeval_current();
+
+       return h;
+}
index d14ce336861ffbf0deb38b123ed903ef9b1dc278..3b836e6a1a96dcedfad2aad7851bb8bb45382b61 100644 (file)
@@ -157,6 +157,7 @@ struct ctdb_status {
        uint32_t total_calls;
        uint32_t pending_calls;
        uint32_t lockwait_calls;
+       uint32_t traverse_calls;
        uint32_t pending_lockwait_calls;
        uint32_t __last_counter; /* hack for control_status_all */
        uint32_t max_hop_count;
index dee21528a59cfb3ac2f4cb9e488a2cd5642cbab7..0efe24a8f73a236f31714a65757fb9e74efb6699 100644 (file)
@@ -110,6 +110,7 @@ static void show_status(struct ctdb_status *s)
        printf(" total_calls             %u\n", s->total_calls);
        printf(" pending_calls           %u\n", s->pending_calls);
        printf(" lockwait_calls          %u\n", s->lockwait_calls);
+       printf(" traverse_calls          %u\n", s->traverse_calls);
        printf(" pending_lockwait_calls  %u\n", s->pending_lockwait_calls);
        printf(" max_hop_count           %u\n", s->max_hop_count);
        printf(" max_call_latency        %.6f sec\n", s->max_call_latency);