}
}
+/*
+ called in the client when we receive a CTDB_REPLY_STORE_UNLOCK from the daemon
+
+ This packet comes in response to a CTDB_REQ_STORE_UNLOCK request packet. It
+ contains any reply data from the call
+*/
+void ctdb_reply_store_unlock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_store_unlock *c = (struct ctdb_reply_store_unlock *)hdr;
+ struct ctdb_call_state *state;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+ if (state == NULL) return;
+
+ state->call.status = c->state;
+
+ talloc_steal(state, c);
+
+ /* get an extra reference here - this prevents the free in ctdb_recv_pkt()
+ from freeing the data */
+ (void)talloc_reference(state, c);
+
+ state->state = CTDB_CALL_DONE;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+}
/*
this is called in the client, when data comes in from the daemon
*/
ctdb_reply_fetch_lock(ctdb, hdr);
break;
+ case CTDB_REPLY_STORE_UNLOCK:
+ ctdb_reply_store_unlock(ctdb, hdr);
+ break;
+
default:
printf("bogus operation code:%d\n",hdr->operation);
}
}
+struct ctdb_call_state *ctdb_client_store_unlock_send(
+ struct ctdb_record_handle *rh,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA data)
+{
+ struct ctdb_call_state *state;
+ struct ctdb_db_context *ctdb_db = talloc_get_type(rh->ctdb_db, struct ctdb_db_context);
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ struct ctdb_req_store_unlock *req;
+ int len, res;
+
+ /* if the domain socket is not yet open, open it */
+ if (ctdb->daemon.sd==-1) {
+ ux_socket_connect(ctdb);
+ }
+
+ state = talloc_zero(ctdb_db, struct ctdb_call_state);
+ if (state == NULL) {
+ printf("failed to allocate state\n");
+ return NULL;
+ }
+ state->state = CTDB_CALL_WAIT;
+ state->ctdb_db = ctdb_db;
+ len = offsetof(struct ctdb_req_store_unlock, data) + rh->key.dsize + data.dsize;
+ state->c = ctdbd_allocate_pkt(ctdb, len);
+ if (state->c == NULL) {
+ printf("failed to allocate packet\n");
+ return NULL;
+ }
+ ZERO_STRUCT(*state->c);
+ talloc_set_name_const(state->c, "ctdbd req_store_unlock packet");
+ talloc_steal(state, state->c);
+
+ req = (struct ctdb_req_store_unlock *)state->c;
+ req->hdr.length = len;
+ req->hdr.ctdb_magic = CTDB_MAGIC;
+ req->hdr.ctdb_version = CTDB_VERSION;
+ req->hdr.operation = CTDB_REQ_STORE_UNLOCK;
+ req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
+ req->db_id = ctdb_db->db_id;
+ req->keylen = rh->key.dsize;
+ req->datalen = data.dsize;
+ memcpy(&req->data[0], rh->key.dptr, rh->key.dsize);
+ memcpy(&req->data[req->keylen], data.dptr, data.dsize);
+
+ res = ctdb_client_queue_pkt(ctdb, &req->hdr);
+ if (res != 0) {
+ return NULL;
+ }
+
+ talloc_free(req);
+
+ return state;
+}
+
/*
make a recv call to the local ctdb daemon - called from client context
return rec;
}
+/*
+ make a recv call to the local ctdb daemon - called from client context
+
+ This is called when the program wants to wait for a ctdb_store_unlock to complete and get the
+ results. This call will block unless the call has already completed.
+*/
+int ctdb_client_store_unlock_recv(struct ctdb_call_state *state, struct ctdb_record_handle *rec)
+{
+ while (state->state < CTDB_CALL_DONE) {
+ event_loop_once(state->ctdb_db->ctdb->ev);
+ }
+ if (state->state != CTDB_CALL_DONE) {
+ ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+ }
+
+ talloc_free(state);
+ return state->state;
+}
+
struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db,
TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data)
return rec;
}
+
+int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
+{
+ struct ctdb_call_state *state;
+ int res;
+
+ state = ctdb_client_store_unlock_send(rec, rec, data);
+ res = ctdb_client_store_unlock_recv(state, rec);
+
+ return res;
+
+}
state->async.private = fl_data;
}
+/*
+ called when the daemon gets a store unlock request from a client
+
+ this would never block?
+ */
+static void daemon_request_store_unlock(struct ctdb_client *client,
+ struct ctdb_req_store_unlock *f)
+{
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_reply_store_unlock r;
+ int res;
+
+ ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
+ /* write the data to ltdb */
+/*XXX*/
+
+ /* now send the reply */
+ ZERO_STRUCT(r);
+
+ r.hdr.length = sizeof(r);
+ r.hdr.ctdb_magic = CTDB_MAGIC;
+ r.hdr.ctdb_version = CTDB_VERSION;
+ r.hdr.operation = CTDB_REPLY_STORE_UNLOCK;
+ r.hdr.reqid = f->hdr.reqid;
+ r.state = CTDB_CALL_DONE;
+
+ res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
+ if (res != 0) {
+ printf("Failed to queue a store unlock response\n");
+ return;
+ }
+}
+
/*
called when the daemon gets a connect wait request from a client
*/
case CTDB_REQ_FETCH_LOCK:
daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
break;
+ case CTDB_REQ_STORE_UNLOCK:
+ daemon_request_store_unlock(client, (struct ctdb_req_store_unlock *)hdr);
+ break;
+ default:
+ printf("daemon: unrecognized operation:%d\n",hdr->operation);
}
done:
CTDB_REQ_CONNECT_WAIT = 1001,
CTDB_REPLY_CONNECT_WAIT = 1002,
CTDB_REQ_FETCH_LOCK = 1003,
- CTDB_REPLY_FETCH_LOCK = 1004
+ CTDB_REPLY_FETCH_LOCK = 1004,
+ CTDB_REQ_STORE_UNLOCK = 1005,
+ CTDB_REPLY_STORE_UNLOCK = 1006
};
#define CTDB_MAGIC 0x43544442 /* CTDB */
uint32_t datalen;
uint8_t data[1]; /* data[] */
};
+struct ctdb_req_store_unlock {
+ struct ctdb_req_header hdr;
+ uint32_t db_id;
+ uint32_t keylen;
+ uint32_t datalen;
+ uint8_t data[1]; /* key[] and data[] */
+};
+
+struct ctdb_reply_store_unlock {
+ struct ctdb_req_header hdr;
+ uint32_t state;
+};
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3);
TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data);
+/*
+ do a store unlock from a client to the local daemon
+*/
+int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
+
#endif