#include "contrib/openbsd/strlcpy.h"
#include "knot/common/hiredis.h"
+#define UNREAD_MAX 20 // Redis write batch length.
+
struct redisContext *zone_redis_connect(conf_t *conf)
{
return rdb_connect(conf);
txn->origin_len = knot_dname_size(zone_name);
txn->incremental = incremental;
txn->removals = false;
+ txn->unread = 0;
txn->err[0] = '\0';
const char *cmd = txn->incremental ? RDB_CMD_UPD_BEGIN " %b %b" :
return KNOT_EOK;
}
+static int finish_unread(zone_redis_txn_t *txn, size_t max)
+{
+ if (txn->unread < max) {
+ return KNOT_EOK;
+ }
+
+ int ret = KNOT_EOK;
+
+ redisReply *reply;
+ for (size_t i = 0; i < max; i++) {
+ if (redisGetReply(txn->rdb, (void **)&reply) == REDIS_OK) {
+ if (ret == KNOT_EOK) {
+ ret = check_reply(txn->rdb, reply, REDIS_REPLY_STATUS, txn->err);
+ }
+ freeReplyObject(reply);
+ } else {
+ if (ret == KNOT_EOK) {
+ ret = check_reply(txn->rdb, NULL, REDIS_REPLY_ERROR, txn->err);
+ }
+ }
+ }
+
+ txn->unread -= max;
+
+ return ret;
+}
+
int zone_redis_write_rrset(zone_redis_txn_t *txn, const knot_rrset_t *rr)
{
if (txn == NULL || rr == NULL || (txn->removals && !txn->incremental)) {
txn->removals ? RDB_CMD_UPD_REMOVE " %b %b %b %d %d %d %b" :
RDB_CMD_UPD_ADD " %b %b %b %d %d %d %b";
- redisReply *reply = redisCommand(txn->rdb, cmd, txn->origin, txn->origin_len,
- &txn->rdb_txn, sizeof(txn->rdb_txn),
- rr->owner, knot_dname_size(rr->owner), rr->type, rr->ttl,
- rr->rrs.count, rr->rrs.rdata, rr->rrs.size);
- int ret = check_reply(txn->rdb, reply, REDIS_REPLY_STATUS, txn->err);
- if (ret != KNOT_EOK) {
- freeReplyObject(reply);
- return ret;
+ int ret = redisAppendCommand(txn->rdb, cmd, txn->origin, txn->origin_len,
+ &txn->rdb_txn, sizeof(txn->rdb_txn),
+ rr->owner, knot_dname_size(rr->owner), rr->type, rr->ttl,
+ rr->rrs.count, rr->rrs.rdata, (size_t)rr->rrs.size);
+ if (ret != REDIS_OK) {
+ return check_reply(txn->rdb, NULL, REDIS_REPLY_ERROR, txn->err);
}
- freeReplyObject(reply);
+ txn->unread++;
- return KNOT_EOK;
+ return finish_unread(txn, UNREAD_MAX);
}
int zone_redis_write_node(zone_redis_txn_t *txn, const zone_node_t *node)
return KNOT_EINVAL;
}
+ int ret = finish_unread(txn, txn->unread);
+ if (ret != KNOT_EOK) {
+ return ret;
+ }
+
const char *cmd = txn->incremental ? RDB_CMD_UPD_COMMIT " %b %b" :
RDB_CMD_ZONE_COMMIT " %b %b";
redisReply *reply = redisCommand(txn->rdb, cmd, txn->origin, txn->origin_len,
&txn->rdb_txn, sizeof(txn->rdb_txn));
- int ret = check_reply(txn->rdb, reply, REDIS_REPLY_STATUS, txn->err);
+ ret = check_reply(txn->rdb, reply, REDIS_REPLY_STATUS, txn->err);
if (ret != KNOT_EOK) {
freeReplyObject(reply);
return ret;
return KNOT_EINVAL;
}
+ (void)finish_unread(txn, txn->unread);
+
const char *cmd = txn->incremental ? RDB_CMD_UPD_ABORT " %b %b" :
RDB_CMD_ZONE_ABORT " %b %b";