EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o
-CTDB_COMMON_OBJ = common/ctdb.o common/util.o
+CTDB_COMMON_OBJ = common/ctdb.o common/util.o common/ctdb_util.o \
+ common/ctdb_call.o common/ctdb_ltdb.o
CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o
/*
- ctdb over TCP
+ ctdb main protocol code
Copyright (C) Andrew Tridgell 2006
#include "system/filesys.h"
#include "ctdb_private.h"
-const char *ctdb_errstr(struct ctdb_context *ctdb)
-{
- return ctdb->err_msg;
-}
-
-
-/*
- remember an error message
-*/
-void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
-{
- va_list ap;
- talloc_free(ctdb->err_msg);
- va_start(ap, fmt);
- ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
- va_end(ap);
-}
-
/*
choose the transport we will use
*/
}
-/*
- parse a IP:port pair
-*/
-static int ctdb_parse_address(struct ctdb_context *ctdb,
- TALLOC_CTX *mem_ctx, const char *str,
- struct ctdb_address *address)
-{
- char *p;
- p = strchr(str, ':');
- if (p == NULL) {
- ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
- return -1;
- }
-
- address->address = talloc_strndup(mem_ctx, str, p-str);
- address->port = strtoul(p+1, NULL, 0);
- return 0;
-}
-
-
/*
add a node to the list of active nodes
*/
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
{
- struct ctdb_node *node;
+ struct ctdb_node *node, **nodep;
+
+ nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
+ CTDB_NO_MEMORY(ctdb, nodep);
+
+ ctdb->nodes = nodep;
+ nodep = &ctdb->nodes[ctdb->num_nodes];
+ (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
+ CTDB_NO_MEMORY(ctdb, *nodep);
+ node = *nodep;
- node = talloc(ctdb, struct ctdb_node);
if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
return -1;
}
node->name = talloc_asprintf(node, "%s:%u",
node->address.address,
node->address.port);
+ /* for now we just set the vnn to the line in the file - this
+ will change! */
+ node->vnn = ctdb->num_nodes;
if (ctdb->methods->add_node(node) != 0) {
talloc_free(node);
return -1;
}
- DLIST_ADD(ctdb->nodes, node);
+ if (ctdb_same_address(&ctdb->address, &node->address)) {
+ ctdb->vnn = node->vnn;
+ }
+
+ ctdb->num_nodes++;
+
return 0;
}
*/
int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
{
- return ctdb_parse_address(ctdb, ctdb, address, &ctdb->address);
+ if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
+ return -1;
+ }
+
+ ctdb->name = talloc_asprintf(ctdb, "%s:%u",
+ ctdb->address.address,
+ ctdb->address.port);
+ return 0;
}
/*
return 0;
}
-/*
- attach to a specific database
-*/
-int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
- int open_flags, mode_t mode)
-{
- /* when we have a separate daemon this will need to be a real
- file, not a TDB_INTERNAL, so the parent can access it to
- for ltdb bypass */
- ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
- if (ctdb->ltdb == NULL) {
- ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
- return -1;
- }
- return 0;
-}
-
/*
start the protocol going
*/
}
/*
- make a remote ctdb call
-*/
-int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
- TDB_DATA *call_data, TDB_DATA *reply_data)
-{
- printf("ctdb_call not implemented\n");
- return -1;
-}
-
-/*
- check if two addresses are the same
+ called by the transport layer when a packet comes in
*/
-bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
- return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
+ printf("received pkt of length %d\n", length);
}
/*
- called by the transport layer when a packet comes in
+ called by the transport layer when a node is dead
*/
-static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+static void ctdb_node_dead(struct ctdb_node *node)
{
- printf("received pkt of length %d\n", length);
+ printf("%s: node %s is dead\n", node->ctdb->name, node->name);
}
/*
called by the transport layer when a node is dead
*/
-static void ctdb_node_dead(struct ctdb_node *node)
+static void ctdb_node_connected(struct ctdb_node *node)
{
- printf("node %s is dead\n", node->name);
+ printf("%s: connected to %s\n", node->ctdb->name, node->name);
}
static const struct ctdb_upcalls ctdb_upcalls = {
- .recv_pkt = ctdb_recv_pkt,
- .node_dead = ctdb_node_dead
+ .recv_pkt = ctdb_recv_pkt,
+ .node_dead = ctdb_node_dead,
+ .node_connected = ctdb_node_connected
};
/*
--- /dev/null
+/*
+ ctdb_call protocol code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "ctdb_private.h"
+
+
+/*
+ local version of ctdb_call
+*/
+static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ return -1;
+}
+
+/*
+ make a remote ctdb call
+*/
+int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
+ TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+ uint32_t dest;
+
+ dest = ctdb_hash(&key) % ctdb->num_nodes;
+ if (dest == ctdb->vnn) {
+ return ctdb_call_local(ctdb, key, call_id, call_data, reply_data);
+ }
+
+
+ return -1;
+}
+
--- /dev/null
+/*
+ ctdb ltdb code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "ctdb_private.h"
+
+/*
+ attach to a specific database
+*/
+int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
+ int open_flags, mode_t mode)
+{
+ /* when we have a separate daemon this will need to be a real
+ file, not a TDB_INTERNAL, so the parent can access it to
+ for ltdb bypass */
+ ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
+ if (ctdb->ltdb == NULL) {
+ ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
+ return -1;
+ }
+ return 0;
+}
--- /dev/null
+/*
+ ctdb utility code
+
+ Copyright (C) Andrew Tridgell 2006
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "ctdb_private.h"
+
+/*
+ return error string for last error
+*/
+const char *ctdb_errstr(struct ctdb_context *ctdb)
+{
+ return ctdb->err_msg;
+}
+
+
+/*
+ remember an error message
+*/
+void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
+{
+ va_list ap;
+ talloc_free(ctdb->err_msg);
+ va_start(ap, fmt);
+ ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
+ va_end(ap);
+}
+
+
+/*
+ parse a IP:port pair
+*/
+int ctdb_parse_address(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx, const char *str,
+ struct ctdb_address *address)
+{
+ char *p;
+ p = strchr(str, ':');
+ if (p == NULL) {
+ ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
+ return -1;
+ }
+
+ address->address = talloc_strndup(mem_ctx, str, p-str);
+ address->port = strtoul(p+1, NULL, 0);
+ return 0;
+}
+
+
+/*
+ check if two addresses are the same
+*/
+bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
+{
+ return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
+}
+
+
+/*
+ hash function for mapping data to a VNN - taken from tdb
+*/
+uint32_t ctdb_hash(TDB_DATA *key)
+{
+ uint32_t value; /* Used to compute the hash value. */
+ uint32_t i; /* Used to cycle through random values. */
+
+ /* Set the initial value from the key size. */
+ for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
+ value = (value + (key->dptr[i] << (i*5 % 24)));
+
+ return (1103515243 * value + 12345);
+}
*/
struct ctdb_node {
struct ctdb_context *ctdb;
- struct ctdb_node *next, *prev;
struct ctdb_address address;
const char *name; /* for debug messages */
void *private; /* private to transport */
+ uint32_t vnn;
};
/*
transport calls up to the ctdb layer
*/
struct ctdb_upcalls {
+ /* recv_pkt is called when a packet comes in */
void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length);
+
+ /* node_dead is called when an attempt to send to a node fails */
void (*node_dead)(struct ctdb_node *);
+
+ /* node_connected is called when a connection to a node is established */
+ void (*node_connected)(struct ctdb_node *);
};
/* main state of the ctdb daemon */
struct ctdb_context {
struct event_context *ev;
struct ctdb_address address;
- struct ctdb_node *nodes; /* list of nodes in the cluster */
+ const char *name;
+ uint32_t vnn; /* our own vnn */
+ uint32_t num_nodes;
+ struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
struct ctdb_registered_call *calls; /* list of registered calls */
char *err_msg;
struct tdb_context *ltdb;
ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
return -1; }} while (0)
+/*
+ operation IDs
+*/
+enum ctdb_operation {
+ CTDB_OP_CALL = 0
+};
+
+/*
+ packet structures
+*/
+struct ctdb_req_header {
+ uint32_t _length; /* ignored by datagram transports */
+ uint32_t operation;
+ uint32_t destnode;
+ uint32_t srcnode;
+ uint32_t reqid;
+ uint32_t reqtimeout;
+};
+
+struct ctdb_reply_header {
+ uint32_t _length; /* ignored by datagram transports */
+ uint32_t operation;
+ uint32_t destnode;
+ uint32_t srcnode;
+ uint32_t reqid;
+};
+
+struct ctdb_req_call {
+ struct ctdb_req_header hdr;
+ uint32_t callid;
+ uint32_t keylen;
+ uint32_t calldatalen;
+ uint8_t data[0]; /* key[] followed by calldata[] */
+};
+
+struct ctdb_reply_call {
+ struct ctdb_reply_header hdr;
+ uint32_t datalen;
+ uint8_t data[0];
+};
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
-
+int ctdb_parse_address(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx, const char *str,
+ struct ctdb_address *address);
+uint32_t ctdb_hash(TDB_DATA *key);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
- int error;
+ int error = 0;
socklen_t len;
if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
- if (error == EINPROGRESS) {
- printf("connect in progress\n");
- return;
- }
- printf("getsockopt errno=%s\n", strerror(errno));
+ printf("getsockopt error=%s\n", strerror(error));
talloc_free(fde);
close(tnode->fd);
tnode->fd = -1;
return;
}
- printf("Established connection to %s\n", node->name);
talloc_free(fde);
tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
ctdb_tcp_node_write, node);
+
+ /* tell the ctdb layer we are connected */
+ node->ctdb->upcalls->node_connected(node);
}
/*
}
/* non-blocking connect - wait for write event */
- event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE,
+ event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ,
ctdb_node_connect_write, node);
}
ctdb_tcp_incoming_read, in);
talloc_set_destructor(in, ctdb_incoming_destructor);
-
- printf("New incoming socket %d\n", in->fd);
}
*/
int ctdb_tcp_start(struct ctdb_context *ctdb)
{
- struct ctdb_node *node;
+ int i;
/* listen on our own address */
if (ctdb_tcp_listen(ctdb) != 0) return -1;
/* startup connections to the other servers - will happen on
next event loop */
- for (node=ctdb->nodes;node;node=node->next) {
+ for (i=0;i<ctdb->num_nodes;i++) {
+ struct ctdb_node *node = *(ctdb->nodes + i);
if (ctdb_same_address(&ctdb->address, &node->address)) continue;
event_add_timed(ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
--- /dev/null
+#!/bin/sh
+
+killall -q ctdb_test
+
+bin/ctdb_test --nlist nodes.txt --listen 127.0.0.1:9001 &
+bin/ctdb_test --nlist nodes.txt --listen 127.0.0.2:9001 &
+
+sleep 3
+killall ctdb_test