From 2daf1d300655e7489be2a82c5612ab26dc14fb4c Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Wed, 11 Apr 2007 11:02:26 +1000 Subject: [PATCH] forgot to add ctdb_client.c (This used to be ctdb commit 136f912562ef00ede5589a7aa080503538d14bc3) --- ctdb/common/ctdb_client.c | 257 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 ctdb/common/ctdb_client.c diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c new file mode 100644 index 00000000000..cbf21367291 --- /dev/null +++ b/ctdb/common/ctdb_client.c @@ -0,0 +1,257 @@ +/* + ctdb daemon code + + Copyright (C) Andrew Tridgell 2007 + Copyright (C) Ronnie Sahlberg 2007 + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ + +#include "includes.h" +#include "db_wrap.h" +#include "lib/tdb/include/tdb.h" +#include "lib/events/events.h" +#include "lib/util/dlinklist.h" +#include "system/network.h" +#include "system/filesys.h" +#include "../include/ctdb.h" +#include "../include/ctdb_private.h" + +/* + queue a packet for sending from client to daemon +*/ +static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length); +} + + +/* + this is called in the client, when data comes in from the daemon + */ +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) +{ + struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); + struct ctdb_req_header *hdr; + + if (cnt < sizeof(*hdr)) { + ctdb_set_error(ctdb, "Bad packet length %d\n", cnt); + return; + } + hdr = (struct ctdb_req_header *)data; + if (cnt != hdr->length) { + ctdb_set_error(ctdb, "Bad header length %d expected %d\n", + hdr->length, cnt); + return; + } + + if (hdr->ctdb_magic != CTDB_MAGIC) { + ctdb_set_error(ctdb, "Non CTDB packet rejected\n"); + return; + } + + if (hdr->ctdb_version != CTDB_VERSION) { + ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version); + return; + } + + ctdb_reply_call(ctdb, hdr); +} + +/* + connect to a unix domain socket +*/ +static int ux_socket_connect(struct ctdb_context *ctdb) +{ + struct sockaddr_un addr; + + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)); + + ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->daemon.sd == -1) { + return -1; + } + + if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { + close(ctdb->daemon.sd); + ctdb->daemon.sd = -1; + return -1; + } + + ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, + CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, ctdb); + return 0; +} + + + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_call to complete and get the + results. This call will block unless the call has already completed. +*/ +int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->node->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return -1; + } + + rec = state->fetch_private; + + /* ugly hack to manage forced migration */ + if (rec != NULL) { + rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr); + rec->data->dsize = state->call.reply_data.dsize; + talloc_free(state); + return 0; + } + + if (state->call.reply_data.dsize) { + call->reply_data.dptr = talloc_memdup(state->node->ctdb, + state->call.reply_data.dptr, + state->call.reply_data.dsize); + call->reply_data.dsize = state->call.reply_data.dsize; + } else { + call->reply_data.dptr = NULL; + call->reply_data.dsize = 0; + } + call->status = state->call.status; + talloc_free(state); + + return 0; +} + + + + +/* + destroy a ctdb_call in client +*/ +static int ctdb_client_call_destructor(struct ctdb_call_state *state) +{ + idr_remove(state->node->ctdb->idr, state->c->hdr.reqid); + return 0; +} + + + +/* + make a ctdb call to the local daemon - async send. Called from client context. + + This constructs a ctdb_call request and queues it for processing. + This call never blocks. +*/ +struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_ltdb_header header; + TDB_DATA data; + int ret; + size_t len; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + ret = ctdb_ltdb_lock(ctdb_db, call->key); + if (ret != 0) { + printf("failed to lock ltdb record\n"); + return NULL; + } + + ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + +#if 0 + if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) { + state = ctdb_call_local_send(ctdb_db, call, &header, &data); + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; + } +#endif + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + + talloc_steal(state, data.dptr); + + len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + ctdb_ltdb_unlock(ctdb_db, call->key); + return NULL; + } + talloc_set_name_const(state->c, "ctdbd req_call packet"); + talloc_steal(state, state->c); + + state->c->hdr.length = len; + state->c->hdr.ctdb_magic = CTDB_MAGIC; + state->c->hdr.ctdb_version = CTDB_VERSION; + state->c->hdr.operation = CTDB_REQ_CALL; + state->c->hdr.destnode = header.dmaster; + state->c->hdr.srcnode = ctdb->vnn; + /* this limits us to 16k outstanding messages - not unreasonable */ + state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + state->c->flags = call->flags; + state->c->db_id = ctdb_db->db_id; + state->c->callid = call->call_id; + state->c->keylen = call->key.dsize; + state->c->calldatalen = call->call_data.dsize; + memcpy(&state->c->data[0], call->key.dptr, call->key.dsize); + memcpy(&state->c->data[call->key.dsize], + call->call_data.dptr, call->call_data.dsize); + state->call = *call; + state->call.call_data.dptr = &state->c->data[call->key.dsize]; + state->call.key.dptr = &state->c->data[0]; + + state->node = ctdb->nodes[header.dmaster]; + state->state = CTDB_CALL_WAIT; + state->header = header; + state->ctdb_db = ctdb_db; + + talloc_set_destructor(state, ctdb_client_call_destructor); + + ctdb_client_queue_pkt(ctdb, &state->c->hdr); + +/*XXX set up timeout to cleanup if server doesnt respond + event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), + ctdb_call_timeout, state); +*/ + + ctdb_ltdb_unlock(ctdb_db, call->key); + return state; +} -- 2.47.3