#define ARG_NUM(arg, out, name) { \
long long val; \
- long long max = (1ULL << (sizeof(out) * 8)) - 1; \
+ uint8_t shift = sizeof(out) < 8 ? sizeof(out) * 8 : 63; \
+ long long max = (1ULL << shift) - 1; \
if (RedisModule_StringToLongLong(arg, &val) != REDISMODULE_OK || val > max) { \
return RedisModule_ReplyWithError(ctx, RDB_E("invalid " name)); \
} \
} \
}
+#define ARG_STREAM_ID(arg, out) { \
+ size_t len; \
+ const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \
+ wire_ctx_t w = wire_ctx_init_const(ptr, len); \
+ out.ms = wire_ctx_read_u64(&w); \
+ out.seq = wire_ctx_read_u64(&w); \
+ if (w.error != KNOT_EOK || wire_ctx_available(&w) != 0) { \
+ return RedisModule_ReplyWithError(ctx, RDB_E("invalid stream ID")); \
+ } \
+}
+
#define ARG_DNAME(arg, out, name) { \
size_t len; \
const uint8_t *ptr = (const uint8_t *)RedisModule_StringPtrLen(arg, &len); \
} \
}
+#define WIRE_STREAM_ID(id) \
+ (RedisModuleStreamID){ .ms = htobe64(id->ms), .seq = htobe64(id->seq) }; \
+ RedisModule_Assert(sizeof(RedisModuleStreamID) == 16);
+
static knot_dname_t *dname_from_str(const char *ptr, size_t len, uint8_t *out, arg_dname_t *origin)
{
- assert(ptr != NULL && out != NULL);
+ RedisModule_Assert(ptr != NULL && out != NULL);
if (knot_dname_from_str(out, ptr, KNOT_DNAME_MAXLEN) == NULL) {
return NULL;
}
static void commit_event(RedisModuleCtx *ctx, rdb_event_t type, const arg_dname_t *origin,
- uint8_t instance, uint32_t serial)
+ uint8_t instance, uint32_t serial, RedisModuleStreamID *stream_id)
{
RedisModuleString *keyname = RedisModule_CreateString(ctx, RDB_EVENT_KEY, strlen(RDB_EVENT_KEY));
RedisModuleKey *stream_key = RedisModule_OpenKey(ctx, keyname, REDISMODULE_READ | REDISMODULE_WRITE);
NULL,
};
- RedisModuleStreamID ts;
- int ret = RedisModule_StreamAdd(stream_key, REDISMODULE_STREAM_ADD_AUTOID, &ts, events, 4);
+ int flags = 0;
+ if (stream_id->ms == 0) {
+ flags = REDISMODULE_STREAM_ADD_AUTOID;
+ }
+ int ret = RedisModule_StreamAdd(stream_key, flags, stream_id, events, 4);
for (RedisModuleString **event = events; *event != NULL; event++) {
RedisModule_FreeString(ctx, *event);
return;
}
- ts.ms -= 1000LLU * rdb_event_age;
- ts.seq = 0;
+ RedisModuleStreamID trim_id = {
+ .ms = stream_id->ms - 1000LLU * rdb_event_age
+ };
// NOTE Trimming with REDISMODULE_STREAM_TRIM_APPROX improves preformance
- long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &ts);
+ long long removed_cnt = RedisModule_StreamTrimByID(stream_key, REDISMODULE_STREAM_TRIM_APPROX, &trim_id);
if (removed_cnt) {
RedisModule_Log(ctx, REDISMODULE_LOGLEVEL_NOTICE, "stream cleanup %lld old events", removed_cnt);
}
raise(e);
}
-static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn)
+static void zone_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *txn,
+ RedisModuleStreamID *stream_id)
{
zone_meta_k meta_key = zone_meta_get_when_open(ctx, origin, txn, REDISMODULE_READ | REDISMODULE_WRITE);
if (meta_key == NULL) {
RedisModule_FreeString(ctx, value);
RedisModule_CloseKey(zones_index);
- RedisModule_ReplicateVerbatim(ctx);
- commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial);
+ commit_event(ctx, RDB_EVENT_ZONE, origin, txn->instance, serial, stream_id);
+
+ // Replicate with explicit ID so replicas use the same ID.
+ RedisModuleStreamID wire_id = WIRE_STREAM_ID(stream_id);
+ RedisModule_Replicate(ctx, RDB_CMD_ZONE_COMMIT, "bbb", (char *)origin->data,
+ origin->len, txn, sizeof(*txn), &wire_id, sizeof(wire_id));
+
RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK);
}
return_ok;
}
-static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn)
+static void upd_commit(RedisModuleCtx *ctx, const arg_dname_t *origin, rdb_txn_t *upd_txn,
+ RedisModuleStreamID *stream_id)
{
upd_meta_k meta_key = upd_meta_get_when_open(ctx, origin, upd_txn, REDISMODULE_READ | REDISMODULE_WRITE);
if (meta_key == NULL) {
upd_meta_unlock(ctx, meta_key, upd_txn->id);
RedisModule_CloseKey(meta_key);
- commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new);
+ commit_event(ctx, RDB_EVENT_UPD, origin, upd_txn->instance, serial_new, stream_id);
+
if (e.ret != KNOT_EOK) {
RedisModule_ReplyWithError(ctx, e.what);
} else {
- RedisModule_ReplicateVerbatim(ctx);
+ // Replicate with explicit ID so replicas use the same ID.
+ RedisModuleStreamID wire_id = WIRE_STREAM_ID(stream_id);
+ RedisModule_Replicate(ctx, RDB_CMD_UPD_COMMIT, "bbb", (char *)origin->data,
+ origin->len, upd_txn, sizeof(*upd_txn), &wire_id, sizeof(wire_id));
RedisModule_ReplyWithSimpleString(ctx, RDB_RETURN_OK);
}
}
rdb_txn_t txn;
ARG_TXN_TXT(argv[2], txn);
- zone_commit(ctx, &origin, &txn);
+ RedisModuleStreamID stream_id = { 0 };
+
+ zone_commit(ctx, &origin, &txn, &stream_id);
return REDISMODULE_OK;
}
static int zone_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
- if (argc != 3) {
+ if (argc < 3) {
return RedisModule_WrongArity(ctx);
}
rdb_txn_t txn;
ARG_TXN(argv[2], txn);
- zone_commit(ctx, &origin, &txn);
+ RedisModuleStreamID stream_id = { 0 };
+ if (argc > 3) {
+ ARG_STREAM_ID(argv[3], stream_id);
+ }
+
+ zone_commit(ctx, &origin, &txn, &stream_id);
return REDISMODULE_OK;
}
rdb_txn_t txn;
ARG_TXN_TXT(argv[2], txn);
- upd_commit(ctx, &origin, &txn);
+ RedisModuleStreamID stream_id = { 0 };
+
+ upd_commit(ctx, &origin, &txn, &stream_id);
return REDISMODULE_OK;
}
static int upd_commit_bin(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
- if (argc != 3) {
+ if (argc < 3) {
return RedisModule_WrongArity(ctx);
}
rdb_txn_t txn;
ARG_TXN(argv[2], txn)
- upd_commit(ctx, &origin, &txn);
+ RedisModuleStreamID stream_id = { 0 };
+ if (argc > 3) {
+ ARG_STREAM_ID(argv[3], stream_id);
+ }
+
+ upd_commit(ctx, &origin, &txn, &stream_id);
return REDISMODULE_OK;
}