talloc_free(r);
}
+
/*
send a redirect reply
*/
struct ctdb_req_call *c,
struct ctdb_ltdb_header *header)
{
- printf("ctdb_call_send_redirect not implemented\n");
+ struct ctdb_reply_redirect *r;
+ struct ctdb_node *node;
+
+ r = talloc_size(ctdb, sizeof(*r));
+ r->hdr.length = sizeof(*r);
+ r->hdr.operation = CTDB_REPLY_REDIRECT;
+ r->hdr.destnode = c->hdr.srcnode;
+ r->hdr.srcnode = ctdb->vnn;
+ r->hdr.reqid = c->hdr.reqid;
+ r->dmaster = header->dmaster;
+
+ node = ctdb->nodes[r->hdr.destnode];
+
+ ctdb->methods->queue_pkt(node, (uint8_t *)r, r->hdr.length);
+
+ talloc_free(r);
}
/*
struct ctdb_node *node;
const char *errmsg;
TDB_DATA reply_data;
+ TDB_DATA key;
+ int redirect_count;
};
state->errmsg = (char *)c->msg;
}
+
+/*
+ called when a CTDB_REPLY_REDIRECT packet comes in
+*/
+void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
+ struct ctdb_call_state *state;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+
+ talloc_steal(state, c);
+
+ /* don't allow for too many redirects */
+ if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
+ c->dmaster = ctdb_lmaster(ctdb, state->key);
+ }
+
+ /* send it off again */
+ state->node = ctdb->nodes[c->dmaster];
+
+ if (ctdb->methods->queue_pkt(state->node, (uint8_t *)state->c,
+ state->c->hdr.length) != 0) {
+ state->state = CTDB_CALL_ERROR;
+ state->errmsg = "unable to queue in ctdb_reply_redirect";
+ }
+}
+
/*
destroy a ctdb_call
*/
if (call_data) {
memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
}
+ state->key.dptr = &state->c->data[0];
+ state->key.dsize = key.dsize;
state->node = ctdb->nodes[header.dmaster];
state->state = CTDB_CALL_WAIT;
return 0;
}
+/*
+ return the lmaster given a key
+*/
+uint32_t ctdb_lmaster(struct ctdb_context *ctdb, TDB_DATA key)
+{
+ return ctdb_hash(&key) % ctdb->num_nodes;
+}
+
/*
construct an initial header for a record with no ltdb header yet
{
header->rsn = 0;
/* initial dmaster is the lmaster */
- header->dmaster = ctdb_hash(&key) % ctdb->num_nodes;
+ header->dmaster = ctdb_lmaster(ctdb, key);
header->laccessor = header->dmaster;
header->lacount = 0;
}
/* arbitrary maximum timeout for ctdb operations */
#define CTDB_REQ_TIMEOUT 10
+/* max number of redirects before we ask the lmaster */
+#define CTDB_MAX_REDIRECT 2
+
/*
the extended header for records in the ltdb
*/
uint8_t msg[0];
};
+struct ctdb_reply_redirect {
+ struct ctdb_req_header hdr;
+ uint32_t dmaster;
+};
+
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+uint32_t ctdb_lmaster(struct ctdb_context *ctdb, TDB_DATA key);
int ctdb_ltdb_fetch(struct ctdb_context *ctdb,
TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data);
int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key,