case CTDB_CONTROL_SET_RECMODE:
CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
- return ctdb_control_set_recmode(ctdb, indata, errormsg);
+ return ctdb_control_set_recmode(ctdb, c, indata, async_reply, errormsg);
case CTDB_CONTROL_SET_MONMODE:
CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
case CTDB_CONTROL_SHUTDOWN:
ctdb_release_all_ips(ctdb);
+ ctdb->methods->shutdown(ctdb);
ctdb_event_script(ctdb, "shutdown");
DEBUG(0,("shutting down\n"));
exit(0);
return 0;
}
+struct ctdb_set_recmode_state {
+ struct ctdb_req_control *c;
+ uint32_t recmode;
+};
+
+/*
+ called when the 'recovered' event script has finished
+ */
+static void ctdb_recovered_callback(struct ctdb_context *ctdb, int status, void *p)
+{
+ struct ctdb_set_recmode_state *state = talloc_get_type(p, struct ctdb_set_recmode_state);
+
+ if (status == 0) {
+ ctdb->recovery_mode = state->recmode;
+ } else {
+ DEBUG(0,(__location__ " recovered event script failed (status %d)\n", status));
+ }
+
+ ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
+ talloc_free(state);
+}
+
/*
set the recovery mode
*/
-int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, TDB_DATA indata,
+int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA indata, bool *async_reply,
const char **errormsg)
{
uint32_t recmode = *(uint32_t *)indata.dptr;
(*errormsg) = "Cannot change recovery mode while not frozen";
return -1;
}
- ctdb->recovery_mode = recmode;
- ctdb_event_script(ctdb, "recovered");
+ if (recmode == CTDB_RECOVERY_NORMAL &&
+ ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+ int ret;
+ struct ctdb_set_recmode_state *state =
+ talloc(ctdb, struct ctdb_set_recmode_state);
+ CTDB_NO_MEMORY(ctdb, state);
+ state->c = talloc_steal(state, c);
+ state->recmode = recmode;
+ /* call the events script to tell all subsystems that we have recovered */
+ ret = ctdb_event_script_callback(ctdb, state,
+ ctdb_recovered_callback,
+ state, "recovered");
+ if (ret != 0) {
+ return ret;
+ }
+ *async_reply = true;
+ }
return 0;
}
close(fd[1]);
- /* we need a new event context */
+ /* shutdown the transport */
+ ctdb->methods->shutdown(ctdb);
+
+ /* get a new event context */
talloc_free(ctdb->ev);
ctdb->ev = event_context_init(ctdb);
int (*add_node)(struct ctdb_node *); /* setup a new node */
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
void *(*allocate_pkt)(TALLOC_CTX *mem_ctx, size_t );
+ void (*shutdown)(struct ctdb_context *); /* shutdown transport */
};
/*
int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_clear_db(struct ctdb_context *ctdb, TDB_DATA indata);
-int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, TDB_DATA data, const char **);
+int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
+ struct ctdb_req_control *c,
+ TDB_DATA indata, bool *async_reply,
+ const char **errormsg);
void ctdb_request_control_reply(struct ctdb_context *ctdb, struct ctdb_req_control *c,
TDB_DATA *outdata, int32_t status, const char *errormsg);
link */
ctdb_queue_set_fd(tnode->queue, -1);
tnode->fd = -1;
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
ctdb_tcp_node_connect, node);
}
talloc_free(fde);
close(tnode->fd);
tnode->fd = -1;
- event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
+ event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
ctdb_tcp_node_connect, node);
return;
}
/* try again once a second */
close(tnode->fd);
tnode->fd = -1;
- event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
+ event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
ctdb_tcp_node_connect, node);
return;
}
/* non-blocking connect - wait for write event */
- tnode->connect_fde = event_add_fd(node->ctdb->ev, node, tnode->fd,
+ tnode->connect_fde = event_add_fd(node->ctdb->ev, tnode, tnode->fd,
EVENT_FD_WRITE|EVENT_FD_READ,
ctdb_node_connect_write, node);
/* don't give it long to connect - retry in one second. This ensures
that we find a node is up quickly (tcp normally backs off a syn reply
delay by quite a lot) */
- tnode->connect_te = event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode, timeval_current_ofs(1, 0),
ctdb_tcp_node_connect, node);
}
static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private_data)
{
- struct ctdb_context *ctdb;
- struct ctdb_tcp *ctcp;
+ struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+ struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
struct sockaddr_in addr;
socklen_t len;
int fd;
struct ctdb_incoming *in;
int one = 1;
- ctdb = talloc_get_type(private_data, struct ctdb_context);
- ctcp = talloc_get_type(ctdb->private_data, struct ctdb_tcp);
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
- in = talloc_zero(ctdb, struct ctdb_incoming);
+ in = talloc_zero(ctcp, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
goto failed;
}
- event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ,
+ event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
ctdb_listen_event, ctdb);
close(lock_fd);
goto failed;
}
- event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ,
+ event_add_fd(ctdb->ev, ctcp, ctcp->listen_fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
ctdb_listen_event, ctdb);
return 0;
*/
static int ctdb_tcp_add_node(struct ctdb_node *node)
{
+ struct ctdb_tcp *ctcp = talloc_get_type(node->ctdb->private_data,
+ struct ctdb_tcp);
struct ctdb_tcp_node *tnode;
- tnode = talloc_zero(node, struct ctdb_tcp_node);
+ tnode = talloc_zero(ctcp, struct ctdb_tcp_node);
CTDB_NO_MEMORY(node->ctdb, tnode);
tnode->fd = -1;
node->private_data = tnode;
- tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
+ tnode->queue = ctdb_queue_setup(node->ctdb, ctcp, tnode->fd, CTDB_TCP_ALIGNMENT,
ctdb_tcp_tnode_cb, node);
return 0;
next event loop */
for (i=0;i<ctdb->num_nodes;i++) {
struct ctdb_node *node = *(ctdb->nodes + i);
+ struct ctdb_tcp_node *tnode = talloc_get_type(
+ node->private_data, struct ctdb_tcp_node);
if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
ctdb_same_address(&ctdb->address, &node->address)) continue;
- event_add_timed(ctdb->ev, node, timeval_zero(),
+ event_add_timed(ctdb->ev, tnode, timeval_zero(),
ctdb_tcp_node_connect, node);
}
}
+/*
+ shutdown the transport
+*/
+static void ctdb_tcp_shutdown(struct ctdb_context *ctdb)
+{
+ struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private_data,
+ struct ctdb_tcp);
+ talloc_free(ctcp);
+ ctdb->private_data = NULL;
+}
+
+
/*
transport packet allocator - allows transport to control memory for packets
*/
.start = ctdb_tcp_start,
.queue_pkt = ctdb_tcp_queue_pkt,
.add_node = ctdb_tcp_add_node,
- .allocate_pkt = ctdb_tcp_allocate_pkt
+ .allocate_pkt = ctdb_tcp_allocate_pkt,
+ .shutdown = ctdb_tcp_shutdown,
};
/*