From: Daniel Salzman Date: Sat, 22 Nov 2025 20:19:44 +0000 (+0100) Subject: redis: preserve event ID during replication X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3e21446c680440436d9a8a8173cb1fbf263d226e;p=thirdparty%2Fknot-dns.git redis: preserve event ID during replication --- diff --git a/src/redis/arg.h b/src/redis/arg.h index bc94fa4a83..ced5479c2f 100644 --- a/src/redis/arg.h +++ b/src/redis/arg.h @@ -129,7 +129,8 @@ typedef struct { #define ARG_NUM(arg, out, name) { \ long long val; \ - long long max = (1ULL << (sizeof(out) * 8)) - 1; \ + uint8_t shift = sizeof(out) < 8 ? sizeof(out) * 8 : 63; \ + long long max = (1ULL << shift) - 1; \ if (RedisModule_StringToLongLong(arg, &val) != REDISMODULE_OK || val > max) { \ return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \ } \ @@ -142,6 +143,17 @@ typedef struct { } \ } +#define ARG_STREAM_ID(arg, out) { \ + size_t len; \ + const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \ + wire_ctx_t w = wire_ctx_init_const(ptr, len); \ + out.ms = wire_ctx_read_u64(&w); \ + out.seq = wire_ctx_read_u64(&w); \ + if (w.error != KNOT_EOK || wire_ctx_available(&w) != 0) { \ + return RedisModule_ReplyWithError(ctx, RDB_E("invalid stream ID")); \ + } \ +} + #define ARG_DNAME(arg, out, name) { \ size_t len; \ const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \ @@ -172,9 +184,13 @@ typedef struct { } \ } +#define WIRE_STREAM_ID(id) \ + (RedisModuleStreamID){ .ms = htobe64(id->ms), .seq = htobe64(id->seq) }; \ + RedisModule_Assert(sizeof(RedisModuleStreamID) == 16); + static knot_dname_t *dname_from_str(const char *ptr, size_t len, uint8_t *out, arg_dname_t *origin) { - assert(ptr != NULL && out != NULL); + RedisModule_Assert(ptr != NULL && out != NULL); if (knot_dname_from_str(out, ptr, KNOT_DNAME_MAXLEN) == NULL) { return NULL; diff --git a/src/redis/internal.h b/src/redis/internal.h index 8a13e21b84..166b5f0b1e 100644 --- a/src/redis/internal.h +++ b/src/redis/internal.h @@ -188,7 +188,7 @@ static bool zone_txn_is_open(RedisModuleCtx *ctx, const arg_dname_t *origin, con } static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_t *origin, - uint8_t instance, uint32_t serial) + uint8_t instance, uint32_t serial, RedisModuleStreamID *stream_id) { RedisModuleString *keyname = RedisModule_CreateString(ctx, RDB_EVENT_KEY, strlen(RDB_EVENT_KEY)); RedisModuleKey *stream_key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ | REDISMODULE_WRITE); @@ -213,8 +213,11 @@ static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_ NULL, }; - RedisModuleStreamID ts; - int ret = RedisModule_StreamAdd(stream_key, REDISMODULE_STREAM_ADD_AUTOID, &ts, events, 4); + int flags = 0; + if (stream_id->ms == 0) { + flags = REDISMODULE_STREAM_ADD_AUTOID; + } + int ret = RedisModule_StreamAdd(stream_key, flags, stream_id, events, 4); for (RedisModuleString **event = events; *event != NULL; event++) { RedisModule_FreeString(ctx, *event); @@ -231,11 +234,12 @@ static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_ return; } - ts.ms -= 1000LLU * rdb_event_age; - ts.seq = 0; + RedisModuleStreamID trim_id = { + .ms = stream_id->ms - 1000LLU * rdb_event_age + }; // NOTE Trimming with REDISMODULE_STREAM_TRIM_APPROX improves preformance - long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &ts); + long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &trim_id); if (removed_cnt) { RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "stream cleanup %lld old events", removed_cnt); } @@ -1561,7 +1565,8 @@ static exception_t zone_meta_active_exchange(RedisModuleCtx *ctx, zone_meta_k ke raise(e); } -static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn) +static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn, + RedisModuleStreamID *stream_id) { zone_meta_k meta_key = zone_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE); if (meta_key == NULL) { @@ -1628,8 +1633,13 @@ static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_ RedisModule_FreeString(ctx, value); RedisModule_CloseKey(zones_index); - RedisModule_ReplicateVerbatim(ctx); - commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial); + commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial, stream_id); + + // Replicate with explicit ID so replicas use the same ID. + RedisModuleStreamID wire_id = WIRE_STREAM_ID(stream_id); + RedisModule_Replicate(ctx, RDB_CMD_ZONE_COMMIT, "bbb", (char *)origin->data, + origin->len, txn, sizeof(*txn), &wire_id, sizeof(wire_id)); + RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); } @@ -2031,7 +2041,8 @@ static exception_t upd_trim_history(RedisModuleCtx *ctx, const arg_dname_t *orig return_ok; } -static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn) +static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn, + RedisModuleStreamID *stream_id) { upd_meta_k meta_key = upd_meta_get_when_open(ctx, origin, upd_txn, REDISMODULE_READ | REDISMODULE_WRITE); if (meta_key == NULL) { @@ -2219,11 +2230,15 @@ static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t upd_meta_unlock(ctx, meta_key, upd_txn->id); RedisModule_CloseKey(meta_key); - commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new); + commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new, stream_id); + if (e.ret != KNOT_EOK) { RedisModule_ReplyWithError(ctx, e.what); } else { - RedisModule_ReplicateVerbatim(ctx); + // Replicate with explicit ID so replicas use the same ID. + RedisModuleStreamID wire_id = WIRE_STREAM_ID(stream_id); + RedisModule_Replicate(ctx, RDB_CMD_UPD_COMMIT, "bbb", (char *)origin->data, + origin->len, upd_txn, sizeof(*upd_txn), &wire_id, sizeof(wire_id)); RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK); } } diff --git a/src/redis/knot.c b/src/redis/knot.c index f6235bbf98..534f12937a 100644 --- a/src/redis/knot.c +++ b/src/redis/knot.c @@ -295,13 +295,15 @@ static int zone_commit_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int ar rdb_txn_t txn; ARG_TXN_TXT(argv[2], txn); - zone_commit(ctx, &origin, &txn); + RedisModuleStreamID stream_id = { 0 }; + + zone_commit(ctx, &origin, &txn, &stream_id); return REDISMODULE_OK; } static int zone_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 3) { + if (argc < 3) { return RedisModule_WrongArity(ctx); } @@ -311,7 +313,12 @@ static int zone_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int ar rdb_txn_t txn; ARG_TXN(argv[2], txn); - zone_commit(ctx, &origin, &txn); + RedisModuleStreamID stream_id = { 0 }; + if (argc > 3) { + ARG_STREAM_ID(argv[3], stream_id); + } + + zone_commit(ctx, &origin, &txn, &stream_id); return REDISMODULE_OK; } @@ -641,13 +648,15 @@ static int upd_commit_txt(RedisModuleCtx *ctx, RedisModuleString **argv, int arg rdb_txn_t txn; ARG_TXN_TXT(argv[2], txn); - upd_commit(ctx, &origin, &txn); + RedisModuleStreamID stream_id = { 0 }; + + upd_commit(ctx, &origin, &txn, &stream_id); return REDISMODULE_OK; } static int upd_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (argc != 3) { + if (argc < 3) { return RedisModule_WrongArity(ctx); } @@ -657,7 +666,12 @@ static int upd_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int arg rdb_txn_t txn; ARG_TXN(argv[2], txn) - upd_commit(ctx, &origin, &txn); + RedisModuleStreamID stream_id = { 0 }; + if (argc > 3) { + ARG_STREAM_ID(argv[3], stream_id); + } + + upd_commit(ctx, &origin, &txn, &stream_id); return REDISMODULE_OK; }