]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
fixed the reverse of the last bug - handle the case when the new dmaster is the lmaster
authorAndrew Tridgell <tridge@samba.org>
Sun, 22 Apr 2007 16:19:49 +0000 (18:19 +0200)
committerAndrew Tridgell <tridge@samba.org>
Sun, 22 Apr 2007 16:19:49 +0000 (18:19 +0200)
(This used to be ctdb commit b2599834d2ace7369a1b36f85fdf6eb62f047e30)

ctdb/common/ctdb_call.c
ctdb/common/ctdb_ltdb.c

index 0145dc698556d3774725fd468be36f6fa07974c3..d81cca9e6f31bb77cf957239550efe827a44b78c 100644 (file)
@@ -95,6 +95,7 @@ int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
        }
 
        if (c->new_data) {
+               /* XXX check that we always have the lock here? */
                if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
                        ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
                        talloc_free(c);
@@ -292,6 +293,50 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        talloc_free(r);
 }
 
+/*
+  called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
+  gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
+
+  must be called with the chainlock held. This function releases the chainlock
+*/
+static void ctdb_become_dmaster(struct ctdb_context *ctdb, 
+                               uint32_t reqid, TDB_DATA data)
+{
+       struct ctdb_call_state *state;
+       struct ctdb_db_context *ctdb_db;
+
+       state = idr_find_type(ctdb->idr, reqid, struct ctdb_call_state);
+       if (state == NULL) {
+               return;
+       }
+
+       ctdb_db = state->ctdb_db;
+
+       DEBUG(2,("vnn %u dmaster response %08x\n", 
+                ctdb->vnn, ctdb_hash(&state->call.key)));
+
+       /* we're now the dmaster - update our local ltdb with new header
+          and data */
+       state->header.dmaster = ctdb->vnn;
+
+       if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
+               ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+               return;
+       }
+
+       ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
+
+       ctdb_ltdb_unlock(ctdb_db, state->call.key);
+
+       talloc_steal(state, state->call.reply_data.dptr);
+
+       state->state = CTDB_CALL_DONE;
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
+}
+
+
 
 /*
   called when a CTDB_REQ_DMASTER packet comes in
@@ -331,17 +376,32 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                DEBUG(2,(__location__ " deferring ctdb_request_dmaster\n"));
                return;
        }
-       
+
+       if (ctdb_lmaster(ctdb, &key) != ctdb->vnn) {
+               DEBUG(0,("vnn %u dmaster request to non-lmaster lmaster=%u\n",
+                        ctdb->vnn, ctdb_lmaster(ctdb, &key)));
+               ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
+       }
+
+       DEBUG(2,("vnn %u dmaster request on %08x for %u from %u\n", 
+                ctdb->vnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
+
        /* its a protocol error if the sending node is not the current dmaster */
        if (header.dmaster != hdr->srcnode) {
-               DEBUG(0,("vnn=%u dmaster request non-master %u dmaster=%u\n",
-                        ctdb->vnn, hdr->srcnode, header.dmaster));
+               DEBUG(0,("vnn %u dmaster request non-master %u dmaster=%u key %08x\n",
+                        ctdb->vnn, hdr->srcnode, header.dmaster, ctdb_hash(&key)));
                ctdb_fatal(ctdb, "ctdb_req_dmaster from non-master");
                return;
        }
 
-       ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
-       ctdb_ltdb_unlock(ctdb_db, key);
+       /* check if the new dmaster is the lmaster, in which case we
+          skip the dmaster reply */
+       if (c->dmaster == ctdb->vnn) {
+               ctdb_become_dmaster(ctdb, hdr->reqid, data);
+       } else {
+               ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
+               ctdb_ltdb_unlock(ctdb_db, key);
+       }
 }
 
 
@@ -404,6 +464,8 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
             ((header.laccessor == c->hdr.srcnode
               && header.lacount >= ctdb->max_lacount)
              || (c->flags&CTDB_IMMEDIATE_MIGRATION)) ) {
+               DEBUG(2,("vnn %u starting migration of %08x to %u\n", 
+                        ctdb->vnn, ctdb_hash(&call.key), c->hdr.srcnode));
                ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
                talloc_free(data.dptr);
                ctdb_ltdb_unlock(ctdb_db, call.key);
@@ -466,6 +528,7 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        }
 }
 
+
 /*
   called when a CTDB_REPLY_DMASTER packet comes in
 
@@ -501,28 +564,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        data.dptr = c->data;
        data.dsize = c->datalen;
 
-       talloc_steal(state, c);
-
-       /* we're now the dmaster - update our local ltdb with new header
-          and data */
-       state->header.dmaster = ctdb->vnn;
-
-       if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
-               ctdb_ltdb_unlock(ctdb_db, state->call.key);
-               ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
-               return;
-       }
-
-       ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
-
-       ctdb_ltdb_unlock(ctdb_db, state->call.key);
-
-       talloc_steal(state, state->call.reply_data.dptr);
-
-       state->state = CTDB_CALL_DONE;
-       if (state->async.fn) {
-               state->async.fn(state);
-       }
+       ctdb_become_dmaster(ctdb, hdr->reqid, data);
 }
 
 
index cb07a72375b51f1ff2d1da83d903057bbe51af78..d49b9b9eb94ef77099675394f54de773de6f246b 100644 (file)
@@ -225,7 +225,11 @@ int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
  */
 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
 {
-       return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+       int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+       if (ret != 0) {
+               DEBUG(0,("tdb_chainunlock failed\n"));
+       }
+       return ret;
 }
 
 struct lock_fetch_state {