talloc_free(r);
}
+
+/*
+ send a dmaster reply
+
+ caller must have the chainlock before calling this routine. Caller must be
+ the lmaster
+*/
+static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA key, TDB_DATA data,
+ uint32_t new_dmaster,
+ uint32_t reqid)
+{
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ struct ctdb_reply_dmaster *r;
+ int ret, len;
+ TALLOC_CTX *tmp_ctx;
+
+ if (ctdb->vnn != ctdb_lmaster(ctdb, &key)) {
+ DEBUG(0,(__location__ " Caller is not lmaster!\n"));
+ return;
+ }
+
+ header->dmaster = new_dmaster;
+ ret = ctdb_ltdb_store(ctdb_db, key, header, data);
+ if (ret != 0) {
+ ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+ return;
+ }
+
+ /* put the packet on a temporary context, allowing us to safely free
+ it below even if ctdb_reply_dmaster() has freed it already */
+ tmp_ctx = talloc_new(ctdb);
+
+ /* send the CTDB_REPLY_DMASTER */
+ len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
+ r = ctdb->methods->allocate_pkt(tmp_ctx, len);
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+
+ talloc_set_name_const(r, "reply_dmaster packet");
+ r->hdr.length = len;
+ r->hdr.ctdb_magic = CTDB_MAGIC;
+ r->hdr.ctdb_version = CTDB_VERSION;
+ r->hdr.operation = CTDB_REPLY_DMASTER;
+ r->hdr.destnode = new_dmaster;
+ r->hdr.srcnode = ctdb->vnn;
+ r->hdr.reqid = reqid;
+ r->datalen = data.dsize;
+ memcpy(&r->data[0], data.dptr, data.dsize);
+
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ talloc_free(tmp_ctx);
+}
+
/*
send a dmaster request (give another node the dmaster for a record)
struct ctdb_req_dmaster *r;
struct ctdb_context *ctdb = ctdb_db->ctdb;
int len;
+ uint32_t lmaster = ctdb_lmaster(ctdb, key);
+
+ if (lmaster == ctdb->vnn) {
+ ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
+ c->hdr.srcnode, c->hdr.reqid);
+ return;
+ }
len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
r = ctdb->methods->allocate_pkt(ctdb, len);
r->hdr.ctdb_magic = CTDB_MAGIC;
r->hdr.ctdb_version = CTDB_VERSION;
r->hdr.operation = CTDB_REQ_DMASTER;
- r->hdr.destnode = ctdb_lmaster(ctdb, key);
+ r->hdr.destnode = lmaster;
r->hdr.srcnode = ctdb->vnn;
r->hdr.reqid = c->hdr.reqid;
r->db_id = c->db_id;
memcpy(&r->data[0], key->dptr, key->dsize);
memcpy(&r->data[key->dsize], data->dptr, data->dsize);
- /* XXX - probably not necessary when lmaster==dmaster
- update the ltdb to record the new dmaster */
- header->dmaster = r->hdr.destnode;
+ header->dmaster = c->hdr.srcnode;
ctdb_ltdb_store(ctdb_db, *key, header, *data);
ctdb_queue_packet(ctdb, &r->hdr);
void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
- struct ctdb_reply_dmaster *r;
TDB_DATA key, data, data2;
struct ctdb_ltdb_header header;
struct ctdb_db_context *ctdb_db;
- int ret, len;
- TALLOC_CTX *tmp_ctx;
+ int ret;
key.dptr = c->data;
key.dsize = c->keylen;
}
/* its a protocol error if the sending node is not the current dmaster */
- if (header.dmaster != hdr->srcnode &&
- hdr->srcnode != ctdb_lmaster(ctdb_db->ctdb, &key)) {
- ctdb_fatal(ctdb, "dmaster request from non-master");
- return;
- }
-
- header.dmaster = c->dmaster;
- ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
- ctdb_ltdb_unlock(ctdb_db, key);
- if (ret != 0) {
- ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+ if (header.dmaster != hdr->srcnode) {
+ DEBUG(0,("vnn=%u dmaster request non-master %u dmaster=%u\n",
+ ctdb->vnn, hdr->srcnode, header.dmaster));
+ ctdb_fatal(ctdb, "ctdb_req_dmaster from non-master");
return;
}
- /* put the packet on a temporary context, allowing us to safely free
- it below even if ctdb_reply_dmaster() has freed it already */
- tmp_ctx = talloc_new(ctdb);
-
- /* send the CTDB_REPLY_DMASTER */
- len = offsetof(struct ctdb_reply_dmaster, data) + data.dsize;
- r = ctdb->methods->allocate_pkt(tmp_ctx, len);
- CTDB_NO_MEMORY_FATAL(ctdb, r);
-
- talloc_set_name_const(r, "reply_dmaster packet");
- r->hdr.length = len;
- r->hdr.ctdb_magic = CTDB_MAGIC;
- r->hdr.ctdb_version = CTDB_VERSION;
- r->hdr.operation = CTDB_REPLY_DMASTER;
- r->hdr.destnode = c->dmaster;
- r->hdr.srcnode = ctdb->vnn;
- r->hdr.reqid = hdr->reqid;
- r->datalen = data.dsize;
- memcpy(&r->data[0], data.dptr, data.dsize);
-
- ctdb_queue_packet(ctdb, &r->hdr);
-
- talloc_free(tmp_ctx);
+ ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
+ ctdb_ltdb_unlock(ctdb_db, key);
}
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) return;
- talloc_steal(state, c);
-
/* don't allow for too many redirects */
if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
c->dmaster = ctdb_lmaster(ctdb, &state->call.key);
ctdb_queue_packet(ctdb, &state->c->hdr);
+#if CTDB_REQ_TIMEOUT
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
ctdb_call_timeout, state);
+#endif
return state;
}