]> git.ipfire.org Git - thirdparty/knot-dns.git/commitdiff
redis: preserve event ID during replication
authorDaniel Salzman <daniel.salzman@nic.cz>
Sat, 22 Nov 2025 20:19:44 +0000 (21:19 +0100)
committerDaniel Salzman <daniel.salzman@nic.cz>
Wed, 26 Nov 2025 14:49:47 +0000 (15:49 +0100)
src/redis/arg.h
src/redis/internal.h
src/redis/knot.c

index bc94fa4a83877cf14bb8808e9d3e9eb0a827dfe7..ced5479c2fab4527f9807d4f202d3a91dc076610 100644 (file)
@@ -129,7 +129,8 @@ typedef struct {
 
 #define ARG_NUM(arg, out, name) { \
        long long val; \
-       long long max = (1ULL << (sizeof(out) * 8)) - 1; \
+       uint8_t shift = sizeof(out) < 8 ? sizeof(out) * 8 : 63; \
+       long long max = (1ULL << shift) - 1; \
        if (RedisModule_StringToLongLong(arg, &val) != REDISMODULE_OK || val > max) { \
                return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \
        } \
@@ -142,6 +143,17 @@ typedef struct {
        } \
 }
 
+#define ARG_STREAM_ID(arg, out) { \
+       size_t len; \
+       const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \
+       wire_ctx_t w = wire_ctx_init_const(ptr, len); \
+       out.ms = wire_ctx_read_u64(&w); \
+       out.seq = wire_ctx_read_u64(&w); \
+       if (w.error != KNOT_EOK || wire_ctx_available(&w) != 0) { \
+               return RedisModule_ReplyWithError(ctx, RDB_E("invalid stream ID")); \
+       } \
+}
+
 #define ARG_DNAME(arg, out, name) { \
        size_t len; \
        const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \
@@ -172,9 +184,13 @@ typedef struct {
        } \
 }
 
+#define WIRE_STREAM_ID(id) \
+       (RedisModuleStreamID){ .ms = htobe64(id->ms), .seq = htobe64(id->seq) }; \
+       RedisModule_Assert(sizeof(RedisModuleStreamID) == 16);
+
 static knot_dname_t *dname_from_str(const char *ptr, size_t len, uint8_t *out, arg_dname_t *origin)
 {
-       assert(ptr != NULL && out != NULL);
+       RedisModule_Assert(ptr != NULL && out != NULL);
 
        if (knot_dname_from_str(out, ptr, KNOT_DNAME_MAXLEN) == NULL) {
                return NULL;
index 8a13e21b84af71c3a96cbaba6f7514858119538d..166b5f0b1e493cbac93b97b7e53ab6d3cc9ed9c7 100644 (file)
@@ -188,7 +188,7 @@ static bool zone_txn_is_open(RedisModuleCtx *ctx, const arg_dname_t *origin, con
 }
 
 static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_t *origin,
-                         uint8_t instance, uint32_t serial)
+                         uint8_t instance, uint32_t serial, RedisModuleStreamID *stream_id)
 {
        RedisModuleString *keyname = RedisModule_CreateString(ctx, RDB_EVENT_KEY, strlen(RDB_EVENT_KEY));
        RedisModuleKey *stream_key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ | REDISMODULE_WRITE);
@@ -213,8 +213,11 @@ static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_
                NULL,
        };
 
-       RedisModuleStreamID ts;
-       int ret = RedisModule_StreamAdd(stream_key, REDISMODULE_STREAM_ADD_AUTOID, &ts, events, 4);
+       int flags = 0;
+       if (stream_id->ms == 0) {
+               flags = REDISMODULE_STREAM_ADD_AUTOID;
+       }
+       int ret = RedisModule_StreamAdd(stream_key, flags, stream_id, events, 4);
 
        for (RedisModuleString **event = events; *event != NULL; event++) {
                RedisModule_FreeString(ctx, *event);
@@ -231,11 +234,12 @@ static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_
                return;
        }
 
-       ts.ms -= 1000LLU * rdb_event_age;
-       ts.seq = 0;
+       RedisModuleStreamID trim_id = {
+               .ms = stream_id->ms - 1000LLU * rdb_event_age
+       };
 
        // NOTE Trimming with REDISMODULE_STREAM_TRIM_APPROX improves preformance
-       long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &ts);
+       long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &trim_id);
        if (removed_cnt) {
                RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "stream cleanup %lld old events", removed_cnt);
        }
@@ -1561,7 +1565,8 @@ static exception_t zone_meta_active_exchange(RedisModuleCtx *ctx, zone_meta_k ke
        raise(e);
 }
 
-static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn)
+static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn,
+                        RedisModuleStreamID *stream_id)
 {
        zone_meta_k meta_key = zone_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE);
        if (meta_key == NULL) {
@@ -1628,8 +1633,13 @@ static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_
        RedisModule_FreeString(ctx, value);
        RedisModule_CloseKey(zones_index);
 
-       RedisModule_ReplicateVerbatim(ctx);
-       commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial);
+       commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial, stream_id);
+
+       // Replicate with explicit ID so replicas use the same ID.
+       RedisModuleStreamID wire_id = WIRE_STREAM_ID(stream_id);
+       RedisModule_Replicate(ctx, RDB_CMD_ZONE_COMMIT, "bbb", (char *)origin->data,
+                             origin->len, txn, sizeof(*txn), &wire_id, sizeof(wire_id));
+
        RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK);
 }
 
@@ -2031,7 +2041,8 @@ static exception_t upd_trim_history(RedisModuleCtx *ctx, const arg_dname_t *orig
        return_ok;
 }
 
-static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn)
+static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn,
+                       RedisModuleStreamID *stream_id)
 {
        upd_meta_k meta_key = upd_meta_get_when_open(ctx, origin, upd_txn, REDISMODULE_READ | REDISMODULE_WRITE);
        if (meta_key == NULL) {
@@ -2219,11 +2230,15 @@ static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t
        upd_meta_unlock(ctx, meta_key, upd_txn->id);
        RedisModule_CloseKey(meta_key);
 
-       commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new);
+       commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new, stream_id);
+
        if (e.ret != KNOT_EOK) {
                RedisModule_ReplyWithError(ctx, e.what);
        } else {
-               RedisModule_ReplicateVerbatim(ctx);
+               // Replicate with explicit ID so replicas use the same ID.
+               RedisModuleStreamID wire_id = WIRE_STREAM_ID(stream_id);
+               RedisModule_Replicate(ctx, RDB_CMD_UPD_COMMIT, "bbb", (char *)origin->data,
+                                     origin->len, upd_txn, sizeof(*upd_txn), &wire_id, sizeof(wire_id));
                RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK);
        }
 }
index f6235bbf987de46930116670c890ebbab3ebba09..534f12937ae6e8024495a107b52641e071585743 100644 (file)
@@ -295,13 +295,15 @@ static int zone_commit_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
        rdb_txn_t txn;
        ARG_TXN_TXT(argv[2], txn);
 
-       zone_commit(ctx, &origin, &txn);
+       RedisModuleStreamID stream_id = { 0 };
+
+       zone_commit(ctx, &origin, &txn, &stream_id);
        return REDISMODULE_OK;
 }
 
 static int zone_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-       if (argc != 3) {
+       if (argc < 3) {
                return RedisModule_WrongArity(ctx);
        }
 
@@ -311,7 +313,12 @@ static int zone_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int ar
        rdb_txn_t txn;
        ARG_TXN(argv[2], txn);
 
-       zone_commit(ctx, &origin, &txn);
+       RedisModuleStreamID stream_id = { 0 };
+       if (argc > 3) {
+               ARG_STREAM_ID(argv[3], stream_id);
+       }
+
+       zone_commit(ctx, &origin, &txn, &stream_id);
        return REDISMODULE_OK;
 }
 
@@ -641,13 +648,15 @@ static int upd_commit_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
        rdb_txn_t txn;
        ARG_TXN_TXT(argv[2], txn);
 
-       upd_commit(ctx, &origin, &txn);
+       RedisModuleStreamID stream_id = { 0 };
+
+       upd_commit(ctx, &origin, &txn, &stream_id);
        return REDISMODULE_OK;
 }
 
 static int upd_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
 {
-       if (argc != 3) {
+       if (argc < 3) {
                return RedisModule_WrongArity(ctx);
        }
 
@@ -657,7 +666,12 @@ static int upd_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
        rdb_txn_t txn;
        ARG_TXN(argv[2], txn)
 
-       upd_commit(ctx, &origin, &txn);
+       RedisModuleStreamID stream_id = { 0 };
+       if (argc > 3) {
+               ARG_STREAM_ID(argv[3], stream_id);
+       }
+
+       upd_commit(ctx, &origin, &txn, &stream_id);
        return REDISMODULE_OK;
 }