]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
forgot to add ctdb_client.c
authorAndrew Tridgell <tridge@samba.org>
Wed, 11 Apr 2007 01:02:26 +0000 (11:02 +1000)
committerAndrew Tridgell <tridge@samba.org>
Wed, 11 Apr 2007 01:02:26 +0000 (11:02 +1000)
(This used to be ctdb commit 136f912562ef00ede5589a7aa080503538d14bc3)

ctdb/common/ctdb_client.c [new file with mode: 0644]

diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c
new file mode 100644 (file)
index 0000000..cbf2136
--- /dev/null
@@ -0,0 +1,257 @@
+/* 
+   ctdb daemon code
+
+   Copyright (C) Andrew Tridgell  2007
+   Copyright (C) Ronnie Sahlberg  2007
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "../include/ctdb.h"
+#include "../include/ctdb_private.h"
+
+/*
+  queue a packet for sending from client to daemon
+*/
+static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
+}
+
+
+/*
+  this is called in the client, when data comes in from the daemon
+ */
+static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
+{
+       struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
+       struct ctdb_req_header *hdr;
+
+       if (cnt < sizeof(*hdr)) {
+               ctdb_set_error(ctdb, "Bad packet length %d\n", cnt);
+               return;
+       }
+       hdr = (struct ctdb_req_header *)data;
+       if (cnt != hdr->length) {
+               ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
+                              hdr->length, cnt);
+               return;
+       }
+
+       if (hdr->ctdb_magic != CTDB_MAGIC) {
+               ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
+               return;
+       }
+
+       if (hdr->ctdb_version != CTDB_VERSION) {
+               ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
+               return;
+       }
+
+       ctdb_reply_call(ctdb, hdr);
+}
+
+/*
+  connect to a unix domain socket
+*/
+static int ux_socket_connect(struct ctdb_context *ctdb)
+{
+       struct sockaddr_un addr;
+
+       memset(&addr, 0, sizeof(addr));
+       addr.sun_family = AF_UNIX;
+       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+
+       ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (ctdb->daemon.sd == -1) {
+               return -1;
+       }
+       
+       if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
+               close(ctdb->daemon.sd);
+               ctdb->daemon.sd = -1;
+               return -1;
+       }
+
+       ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
+                                             CTDB_DS_ALIGNMENT, 
+                                             ctdb_client_read_cb, ctdb);
+       return 0;
+}
+
+
+
+/*
+  make a recv call to the local ctdb daemon - called from client context
+
+  This is called when the program wants to wait for a ctdb_call to complete and get the 
+  results. This call will block unless the call has already completed.
+*/
+int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+{
+       struct ctdb_record_handle *rec;
+
+       while (state->state < CTDB_CALL_DONE) {
+               event_loop_once(state->node->ctdb->ev);
+       }
+       if (state->state != CTDB_CALL_DONE) {
+               ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+               talloc_free(state);
+               return -1;
+       }
+
+       rec = state->fetch_private;
+
+       /* ugly hack to manage forced migration */
+       if (rec != NULL) {
+               rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
+               rec->data->dsize = state->call.reply_data.dsize;
+               talloc_free(state);
+               return 0;
+       }
+
+       if (state->call.reply_data.dsize) {
+               call->reply_data.dptr = talloc_memdup(state->node->ctdb,
+                                                     state->call.reply_data.dptr,
+                                                     state->call.reply_data.dsize);
+               call->reply_data.dsize = state->call.reply_data.dsize;
+       } else {
+               call->reply_data.dptr = NULL;
+               call->reply_data.dsize = 0;
+       }
+       call->status = state->call.status;
+       talloc_free(state);
+
+       return 0;
+}
+
+
+
+
+/*
+  destroy a ctdb_call in client
+*/
+static int ctdb_client_call_destructor(struct ctdb_call_state *state)  
+{
+       idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+       return 0;
+}
+
+
+
+/*
+  make a ctdb call to the local daemon - async send. Called from client context.
+
+  This constructs a ctdb_call request and queues it for processing. 
+  This call never blocks.
+*/
+struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, 
+                                             struct ctdb_call *call)
+{
+       struct ctdb_call_state *state;
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+       struct ctdb_ltdb_header header;
+       TDB_DATA data;
+       int ret;
+       size_t len;
+
+       /* if the domain socket is not yet open, open it */
+       if (ctdb->daemon.sd==-1) {
+               ux_socket_connect(ctdb);
+       }
+
+       ret = ctdb_ltdb_lock(ctdb_db, call->key);
+       if (ret != 0) {
+               printf("failed to lock ltdb record\n");
+               return NULL;
+       }
+
+       ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
+       if (ret != 0) {
+               ctdb_ltdb_unlock(ctdb_db, call->key);
+               return NULL;
+       }
+
+#if 0
+       if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+               state = ctdb_call_local_send(ctdb_db, call, &header, &data);
+               ctdb_ltdb_unlock(ctdb_db, call->key);
+               return state;
+       }
+#endif
+
+       state = talloc_zero(ctdb_db, struct ctdb_call_state);
+       if (state == NULL) {
+               printf("failed to allocate state\n");
+               ctdb_ltdb_unlock(ctdb_db, call->key);
+               return NULL;
+       }
+
+       talloc_steal(state, data.dptr);
+
+       len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
+       state->c = ctdbd_allocate_pkt(ctdb, len);
+       if (state->c == NULL) {
+               printf("failed to allocate packet\n");
+               ctdb_ltdb_unlock(ctdb_db, call->key);
+               return NULL;
+       }
+       talloc_set_name_const(state->c, "ctdbd req_call packet");
+       talloc_steal(state, state->c);
+
+       state->c->hdr.length    = len;
+       state->c->hdr.ctdb_magic = CTDB_MAGIC;
+       state->c->hdr.ctdb_version = CTDB_VERSION;
+       state->c->hdr.operation = CTDB_REQ_CALL;
+       state->c->hdr.destnode  = header.dmaster;
+       state->c->hdr.srcnode   = ctdb->vnn;
+       /* this limits us to 16k outstanding messages - not unreasonable */
+       state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
+       state->c->flags         = call->flags;
+       state->c->db_id         = ctdb_db->db_id;
+       state->c->callid        = call->call_id;
+       state->c->keylen        = call->key.dsize;
+       state->c->calldatalen   = call->call_data.dsize;
+       memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
+       memcpy(&state->c->data[call->key.dsize], 
+              call->call_data.dptr, call->call_data.dsize);
+       state->call                = *call;
+       state->call.call_data.dptr = &state->c->data[call->key.dsize];
+       state->call.key.dptr       = &state->c->data[0];
+
+       state->node   = ctdb->nodes[header.dmaster];
+       state->state  = CTDB_CALL_WAIT;
+       state->header = header;
+       state->ctdb_db = ctdb_db;
+
+       talloc_set_destructor(state, ctdb_client_call_destructor);
+
+       ctdb_client_queue_pkt(ctdb, &state->c->hdr);
+
+/*XXX set up timeout to cleanup if server doesnt respond
+       event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
+                       ctdb_call_timeout, state);
+*/
+
+       ctdb_ltdb_unlock(ctdb_db, call->key);
+       return state;
+}