#include "lib/util/sys_rw.h"
#include "lib/util/blocking.h"
#include "ctdb/include/ctdb_protocol.h"
+#include "lib/async_req/async_sock.h"
/* paths to these include files come from --with-ctdb= in configure */
* Outgoing queue for writev_send of asynchronous ctdb requests
*/
struct tevent_queue *outgoing;
+ struct tevent_req **pending;
+ struct tevent_req *read_req;
};
static void ctdbd_async_socket_handler(struct tevent_context *ev,
static bool ctdbd_conn_has_async_reqs(struct ctdbd_connection *conn)
{
- return (conn->fde != NULL);
+ size_t len = talloc_array_length(conn->pending);
+ return ((len != 0) || (conn->fde != NULL));
}
static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
};
}
+struct ctdbd_pkt_read_state {
+ uint8_t *pkt;
+};
+
+static ssize_t ctdbd_pkt_read_more(
+ uint8_t *buf, size_t buflen, void *private_data);
+static void ctdbd_pkt_read_done(struct tevent_req *subreq);
+
+static struct tevent_req *ctdbd_pkt_read_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd)
+{
+ struct tevent_req *req = NULL, *subreq = NULL;
+ struct ctdbd_pkt_read_state *state = NULL;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdbd_pkt_read_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ subreq = read_packet_send(state, ev, fd, 4, ctdbd_pkt_read_more, NULL);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdbd_pkt_read_done, req);
+ return req;
+}
+
+static ssize_t ctdbd_pkt_read_more(
+ uint8_t *buf, size_t buflen, void *private_data)
+{
+ uint32_t msglen;
+ if (buflen < 4) {
+ return -1;
+ }
+ if (buflen > 4) {
+ return 0; /* Been here, done */
+ }
+ memcpy(&msglen, buf, 4);
+
+ if (msglen < sizeof(struct ctdb_req_header)) {
+ return -1;
+ }
+ return msglen - sizeof(msglen);
+}
+
+static void ctdbd_pkt_read_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdbd_pkt_read_state *state = tevent_req_data(
+ req, struct ctdbd_pkt_read_state);
+ ssize_t nread;
+ int err;
+
+ nread = read_packet_recv(subreq, state, &state->pkt, &err);
+ TALLOC_FREE(subreq);
+ if (nread == -1) {
+ tevent_req_error(req, err);
+ return;
+ }
+ tevent_req_done(req);
+}
+
+static int ctdbd_pkt_read_recv(
+ struct tevent_req *req, TALLOC_CTX *mem_ctx, uint8_t **pkt)
+{
+ struct ctdbd_pkt_read_state *state = tevent_req_data(
+ req, struct ctdbd_pkt_read_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ return err;
+ }
+ *pkt = talloc_move(mem_ctx, &state->pkt);
+ tevent_req_received(req);
+ return 0;
+}
+
+static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn);
+static void ctdbd_conn_received(struct tevent_req *subreq);
+
+struct ctdbd_req_state {
+ struct ctdbd_connection *conn;
+ struct tevent_context *ev;
+ uint32_t reqid;
+ struct ctdb_req_header *reply;
+};
+
+static void ctdbd_req_unset_pending(struct tevent_req *req)
+{
+ struct ctdbd_req_state *state = tevent_req_data(
+ req, struct ctdbd_req_state);
+ struct ctdbd_connection *conn = state->conn;
+ size_t num_pending = talloc_array_length(conn->pending);
+ size_t i, num_after;
+
+ tevent_req_set_cleanup_fn(req, NULL);
+
+ if (num_pending == 1) {
+ /*
+ * conn->read_req is a child of conn->pending
+ */
+ TALLOC_FREE(conn->pending);
+ conn->read_req = NULL;
+ return;
+ }
+
+ for (i=0; i<num_pending; i++) {
+ if (req == conn->pending[i]) {
+ break;
+ }
+ }
+ if (i == num_pending) {
+ /*
+ * Something's seriously broken. Just returning here is the
+ * right thing nevertheless, the point of this routine is to
+ * remove ourselves from conn->pending.
+ */
+ return;
+ }
+
+ num_after = num_pending - i - 1;
+ if (num_after > 0) {
+ memmove(&conn->pending[i],
+ &conn->pending[i] + 1,
+ sizeof(*conn->pending) * num_after);
+ }
+ conn->pending = talloc_realloc(
+ NULL, conn->pending, struct tevent_req *, num_pending - 1);
+}
+
+static void ctdbd_req_cleanup(
+ struct tevent_req *req, enum tevent_req_state req_state)
+{
+ ctdbd_req_unset_pending(req);
+}
+
+static bool ctdbd_req_set_pending(struct tevent_req *req)
+{
+ struct ctdbd_req_state *state = tevent_req_data(
+ req, struct ctdbd_req_state);
+ struct ctdbd_connection *conn = state->conn;
+ struct tevent_req **pending = NULL;
+ size_t num_pending = talloc_array_length(conn->pending);
+ bool ok;
+
+ pending = talloc_realloc(
+ conn, conn->pending, struct tevent_req *, num_pending + 1);
+ if (pending == NULL) {
+ return false;
+ }
+ pending[num_pending] = req;
+ conn->pending = pending;
+
+ tevent_req_set_cleanup_fn(req, ctdbd_req_cleanup);
+
+ ok = ctdbd_conn_receive_next(conn);
+ if (!ok) {
+ ctdbd_req_unset_pending(req);
+ return false;
+ }
+
+ return true;
+}
+
+static bool ctdbd_conn_receive_next(struct ctdbd_connection *conn)
+{
+ size_t num_pending = talloc_array_length(conn->pending);
+ struct tevent_req *req = NULL;
+ struct ctdbd_req_state *state = NULL;
+
+ if (conn->read_req != NULL) {
+ return true;
+ }
+ if (num_pending == 0) {
+ /*
+ * done for now
+ */
+ return true;
+ }
+
+ req = conn->pending[0];
+ state = tevent_req_data(req, struct ctdbd_req_state);
+
+ conn->read_req = ctdbd_pkt_read_send(
+ conn->pending, state->ev, conn->fd);
+ if (conn->read_req == NULL) {
+ return false;
+ }
+ tevent_req_set_callback(conn->read_req, ctdbd_conn_received, conn);
+ return true;
+}
+
+static void ctdbd_conn_received(struct tevent_req *subreq)
+{
+ struct ctdbd_connection *conn = tevent_req_callback_data(
+ subreq, struct ctdbd_connection);
+ TALLOC_CTX *frame = talloc_stackframe();
+ uint8_t *pkt = NULL;
+ int ret;
+ struct ctdb_req_header *hdr = NULL;
+ uint32_t reqid;
+ struct tevent_req *req = NULL;
+ struct ctdbd_req_state *state = NULL;
+ size_t i, num_pending;
+ bool ok;
+
+ SMB_ASSERT(subreq == conn->read_req);
+ conn->read_req = NULL;
+
+ ret = ctdbd_pkt_read_recv(subreq, frame, &pkt);
+ TALLOC_FREE(subreq);
+ if (ret != 0) {
+ cluster_fatal("ctdbd_pkt_read failed\n");
+ }
+
+ hdr = (struct ctdb_req_header *)pkt;
+ reqid = hdr->reqid;
+ num_pending = talloc_array_length(conn->pending);
+
+ for (i=0; i<num_pending; i++) {
+ req = conn->pending[i];
+ state = tevent_req_data(req, struct ctdbd_req_state);
+ if (state->reqid == reqid) {
+ break;
+ }
+ }
+
+ if (i == num_pending) {
+ /* not found */
+ TALLOC_FREE(frame);
+ return;
+ }
+
+ state->reply = talloc_move(state, &hdr);
+ tevent_req_defer_callback(req, state->ev);
+ tevent_req_done(req);
+
+ TALLOC_FREE(frame);
+
+ ok = ctdbd_conn_receive_next(conn);
+ if (!ok) {
+ cluster_fatal("ctdbd_conn_receive_next failed\n");
+ }
+}
+
+static void ctdbd_req_written(struct tevent_req *subreq);
+
+struct tevent_req *ctdbd_req_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdbd_connection *conn,
+ struct iovec *iov,
+ size_t num_iov)
+{
+ struct tevent_req *req = NULL, *subreq = NULL;
+ struct ctdbd_req_state *state = NULL;
+ struct ctdb_req_header *hdr = NULL;
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state, struct ctdbd_req_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->conn = conn;
+ state->ev = ev;
+
+ if ((num_iov == 0) ||
+ (iov[0].iov_len < sizeof(struct ctdb_req_header))) {
+ tevent_req_error(req, EINVAL);
+ return tevent_req_post(req, ev);
+ }
+ hdr = iov[0].iov_base;
+ state->reqid = hdr->reqid;
+
+ ok = ctdbd_req_set_pending(req);
+ if (!ok) {
+ tevent_req_oom(req);
+ return tevent_req_post(req, ev);
+ }
+
+ subreq = writev_send(
+ state, ev, conn->outgoing, conn->fd, false, iov, num_iov);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdbd_req_written, req);
+
+ return req;
+}
+
+static void ctdbd_req_written(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ ssize_t nwritten;
+ int err;
+
+ nwritten = writev_recv(subreq, &err);
+ TALLOC_FREE(subreq);
+ if (nwritten == -1) {
+ tevent_req_error(req, err);
+ return;
+ }
+}
+
+int ctdbd_req_recv(
+ struct tevent_req *req,
+ TALLOC_CTX *mem_ctx,
+ struct ctdb_req_header **reply)
+{
+ struct ctdbd_req_state *state = tevent_req_data(
+ req, struct ctdbd_req_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ return err;
+ }
+ *reply = talloc_move(mem_ctx, &state->reply);
+ tevent_req_received(req);
+ return 0;
+}
+
struct ctdbd_parse_state {
struct tevent_context *ev;
struct ctdbd_connection *conn;