From 06905d7e331ddc4623b690566b52409def4bf3fc Mon Sep 17 00:00:00 2001 From: Arran Cudbard-Bell Date: Thu, 13 Apr 2023 15:55:59 +1000 Subject: [PATCH] redis: Add the ability to add xlat wrappers which allow lua functions to be called on the redis cluster --- raddb/mods-available/redis | 37 ++ src/modules/rlm_redis/rlm_redis.c | 422 +++++++++++++++++++++-- src/tests/modules/redis/functions.attrs | 11 + src/tests/modules/redis/functions.unlang | 32 ++ src/tests/modules/redis/module.conf | 18 + 5 files changed, 487 insertions(+), 33 deletions(-) create mode 100644 src/tests/modules/redis/functions.attrs create mode 100644 src/tests/modules/redis/functions.unlang diff --git a/raddb/mods-available/redis b/raddb/mods-available/redis index fe8d38a939..04eb161330 100644 --- a/raddb/mods-available/redis +++ b/raddb/mods-available/redis @@ -57,6 +57,43 @@ redis { # # password = thisisreallysecretandhardtoguess + # + # lua { ... }:: + # + # Configuration options which control the execution of lua scripts + # on redis nodes. + # + lua { + # + # function { ... }:: + # + # Every function section listed here will be registered as an expansion with a name + # in the format `.`. + # + # For example the function below would be callable as `%(redis.hello_world:...)`. + # + # expansion functions take the same arguments as the redis `EVALSHA` command, + # i.e. ` [ [ ...]] [ [ ...]]`. + # + # `numkeys` specifies how many of the proceeding arguments should be treated as keys. + # + # The redis module will use the first key to determine which cluster node the function + # should called on. + # + # The redis module pre-calcualtes the SHA1 hash of all lua functions on startup. + # When an expansion function is called, it uses the `EVALSHA` command to attempt to + # call lua function on a remote redis node. If `EVALSHA` fails with an error indicating + # no script could be found with the calculated SHA1 hash, the lua function will be + # loaded transparently using `SCRIPT LOAD`. + # + function hello_world { + # + # body:: Lua code to send to redis nodes with SCRIPT LOAD + # + body = 'return "hello world"' + } + } + # # pool { ... }:: # diff --git a/src/modules/rlm_redis/rlm_redis.c b/src/modules/rlm_redis/rlm_redis.c index 043cb86632..f35799d244 100644 --- a/src/modules/rlm_redis/rlm_redis.c +++ b/src/modules/rlm_redis/rlm_redis.c @@ -28,45 +28,135 @@ RCSID("$Id$") +#include +#include + +#include +#include + #include -#include +#include #include -#include +#include +#include + +#include #include -#include -#include +#include +#include +#include +#include +#include +#include + +/** A lua function or stored procedure we make available as an xlat + * + */ +typedef struct { + char const *name; //!< Friendly name for the function. Used to register the equivalent xlat. + char digest[(SHA1_DIGEST_LENGTH * 2) + 1]; //!< pre-computed hash of lua code. + char const *body; //!< the actual lua code. + bool read_only; //!< Function has no side effects +} redis_lua_func_t; + +/** Instance of a redis lua func xlat + * + */ +typedef struct { + redis_lua_func_t *func; //!< Function configuration. +} redis_lua_func_inst_t; + + +typedef struct { + redis_lua_func_t **funcs; //!< Array of functions to register. + +} rlm_redis_lua_t; + +/** rlm_redis module instance + * + */ +typedef struct { + fr_redis_conf_t conf; //!< Connection parameters for the Redis server. + //!< Must be first field in this struct. + + rlm_redis_lua_t lua; //!< Array of functions to register. + + fr_redis_cluster_t *cluster; //!< Redis cluster. +} rlm_redis_t; + +static int lua_func_body_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, CONF_PARSER const *rule); + +static CONF_PARSER module_lua_func[] = { + { FR_CONF_OFFSET("body", FR_TYPE_STRING, redis_lua_func_t, body), .func = lua_func_body_parse }, + CONF_PARSER_TERMINATOR +}; + +static CONF_PARSER module_lua[] = { + { FR_CONF_SUBSECTION_ALLOC("function", FR_TYPE_SUBSECTION | FR_TYPE_MULTI, rlm_redis_lua_t, funcs, module_lua_func), + .subcs_type = "redis_lua_func_t", .ident2 = CF_IDENT_ANY }, + CONF_PARSER_TERMINATOR +}; static CONF_PARSER module_config[] = { + { FR_CONF_OFFSET_SUBSECTION("lua", 0, rlm_redis_t, lua, module_lua) }, REDIS_COMMON_CONFIG, CONF_PARSER_TERMINATOR }; -/** rlm_redis module instance +/** Do basic processing for a lua function body and compute its sha1 hash * */ -typedef struct { - fr_redis_conf_t conf; //!< Connection parameters for the Redis server. - //!< Must be first field in this struct. +static int lua_func_body_parse(TALLOC_CTX *ctx, void *out, void *parent, CONF_ITEM *ci, CONF_PARSER const *rule) +{ + int ret; + redis_lua_func_t *func = talloc_get_type_abort(parent, redis_lua_func_t); + char const *body; + fr_sha1_ctx sha1_ctx; + uint8_t digest[SHA1_DIGEST_LENGTH]; - fr_redis_cluster_t *cluster; //!< Redis cluster. -} rlm_redis_t; + /* + * Get the function name from name2 + * of the enclosing function section. + */ + func->name = cf_section_name2(cf_item_to_section(cf_parent(ci))); + if (unlikely(!func->name)) { + cf_log_err(cf_parent(ci), "functions must be declared as \"function {\""); + return -1; + } + + /* + * Perform normal string parsing first + */ + if ((ret = cf_pair_parse_value(ctx, out, parent, ci, rule)) < 0) return ret; + body = *((char **)out); + + fr_sha1_init(&sha1_ctx); + fr_sha1_update(&sha1_ctx, (uint8_t const *)body, talloc_array_length(body) - 1); + fr_sha1_final(digest, &sha1_ctx); + fr_base16_encode(&FR_SBUFF_OUT(func->digest, sizeof(func->digest)), &FR_DBUFF_TMP(digest, sizeof(digest))); + + if (DEBUG_ENABLED3) cf_log_debug(ci, "sha1 hash of function is %pV", fr_box_strvalue_len(func->digest, sizeof(func->digest) - 1)); + + return 0; +} /** Change the state of a connection to READONLY execute a command and switch to READWRITE * - * @param[out] status_out Where to write the status from the command. - * @param[out] reply_out Where to write the reply associated with the highest priority status. - * @param[in] request The current request. - * @param[in] conn to issue commands with. - * @param[in] argc Redis command argument count. - * @param[in] argv Redis command arguments. + * @param[out] status_out Where to write the status from the command. + * @param[out] reply_out Where to write the reply associated with the highest priority status. + * @param[in] request The current request. + * @param[in] conn to issue commands with. + * @param[in] argc Redis command argument count. + * @param[in] argv Redis command arguments. + * @param[in] arg_len Optional array of redis command argument length. * @return * - 0 success. * - -1 normal failure. * - -2 failure that may leave the connection in a READONLY state. */ static int redis_command_read_only(fr_redis_rcode_t *status_out, redisReply **reply_out, - request_t *request, fr_redis_conn_t *conn, int argc, char const **argv) + request_t *request, fr_redis_conn_t *conn, int argc, char const **argv, size_t arg_len[]) { bool maybe_more = false; redisReply *reply; @@ -75,7 +165,7 @@ static int redis_command_read_only(fr_redis_rcode_t *status_out, redisReply **re *reply_out = NULL; redisAppendCommand(conn->handle, "READONLY"); - redisAppendCommandArgv(conn->handle, argc, argv, NULL); + redisAppendCommandArgv(conn->handle, argc, argv, arg_len); redisAppendCommand(conn->handle, "READWRITE"); /* @@ -85,7 +175,7 @@ static int redis_command_read_only(fr_redis_rcode_t *status_out, redisReply **re if (redisGetReply(conn->handle, (void **)&reply) == REDIS_OK) maybe_more = true; status = fr_redis_command_status(conn, reply); if (status != REDIS_RCODE_SUCCESS) { - REDEBUG("Setting READONLY failed"); + ROPTIONAL(REDEBUG, ERROR, "Setting READONLY failed"); *reply_out = reply; *status_out = status; @@ -126,7 +216,7 @@ static int redis_command_read_only(fr_redis_rcode_t *status_out, redisReply **re */ if ((redisGetReply(conn->handle, (void **)&reply) != REDIS_OK) || (fr_redis_command_status(conn, reply) != REDIS_RCODE_SUCCESS)) { - REDEBUG("Setting READWRITE failed"); + ROPTIONAL(REDEBUG, ERROR, "Setting READWRITE failed"); fr_redis_reply_free(&reply); /* There could be a response we need to free */ fr_redis_reply_free(reply_out); @@ -250,6 +340,187 @@ static xlat_action_t redis_node_xlat(TALLOC_CTX *ctx, fr_dcursor_t *out, return XLAT_ACTION_DONE; } +static xlat_arg_parser_t const redis_lua_func_args[] = { + { .required = true, .variadic = true, .concat = true, .type = FR_TYPE_STRING }, + XLAT_ARG_PARSER_TERMINATOR +}; + +/** Call a lua function on the redis server + * + * Lua functions either get uploaded when the module is instantiated or the first + * time they get executed. + */ +static xlat_action_t redis_lua_func_xlat(TALLOC_CTX *ctx, fr_dcursor_t *out, + xlat_ctx_t const *xctx, + request_t *request, fr_value_box_list_t *in) +{ + rlm_redis_t *inst = talloc_get_type_abort(xctx->mctx->inst->data, rlm_redis_t); + redis_lua_func_inst_t *xlat_inst = talloc_get_type_abort(xctx->inst, redis_lua_func_inst_t); + redis_lua_func_t *func = xlat_inst->func; + + fr_redis_conn_t *conn; + fr_redis_cluster_state_t state; + fr_redis_rcode_t status; + + redisReply *reply = NULL; + int s_ret; + + char const *argv[MAX_REDIS_ARGS]; + size_t arg_len[MAX_REDIS_ARGS]; + int argc; + uint8_t const *key = NULL; + size_t key_len = 0; + + xlat_action_t action = XLAT_ACTION_DONE; + fr_value_box_t *vb_out; + + /* + * Try EVALSHA first, and if that fails fall back to SCRIPT LOAD + */ + argv[0] = "EVALSHA"; + arg_len[0] = sizeof("EVALSHA") - 1; + argv[1] = func->digest; + arg_len[1] = sizeof(func->digest) - 1; + argc = 2; + + fr_value_box_list_foreach(in, vb) { + if (argc == NUM_ELEMENTS(argv)) { + REDEBUG("Too many arguments (%i)", argc); + REXDENT(); + return XLAT_ACTION_FAIL; + } + argv[argc] = vb->vb_strvalue; + arg_len[argc] = vb->vb_length; + argc++; + } + + /* + * For eval commands all keys should hash to the same redis instance + * so we just use the first key (the arg after the key count). + */ + if (argc > 3) { + key = (uint8_t const *)argv[3]; + key_len = arg_len[3]; + } + + for (s_ret = fr_redis_cluster_state_init(&state, &conn, inst->cluster, request, key, key_len, func->read_only); + s_ret == REDIS_RCODE_TRY_AGAIN; /* Continue */ + s_ret = fr_redis_cluster_state_next(&state, &conn, inst->cluster, request, status, &reply)) { + bool script_load_done = false; + + again: + RDEBUG3("Calling script 0x%s", func->digest); + if (argc > 2) { + RDEBUG3("With arguments"); + RINDENT(); + for (int i = 2; i < argc; i++) RDEBUG3("[%i] %s", i, argv[i]); + REXDENT(); + } + if (!func->read_only) { + reply = redisCommandArgv(conn->handle, argc, argv, arg_len); + status = fr_redis_command_status(conn, reply); + } else if (redis_command_read_only(&status, &reply, request, conn, argc, argv, arg_len) == -2) { + state.close_conn = true; + } + + if (status != REDIS_RCODE_NO_SCRIPT) continue; + + REDEBUG3("Loading lua function \"%s\" (0x%s)", func->name, func->digest); + { + char const *script_load_argv[] = { + "SCRIPT", + "LOAD", + func->body + }; + + size_t script_load_arg_len[] = { + (sizeof("SCRIPT") - 1), + (sizeof("LOAD") - 1), + (talloc_array_length(func->body) - 1) + }; + + /* + * Loading the script failed... fail the call. + */ + if (script_load_done) { + script_load_failed: + status = REDIS_RCODE_ERROR; + fr_redis_reply_free(&reply); + continue; + } + + /* + * Fixme: Really the script load and the eval call should be + * handled in a single MULTI/EXEC block, but the complexity + * in handling this properly is great, and most of this + * synchronous code will need to be rewritten, so for now + * we just load the script and try again. + */ + if (!func->read_only) { + reply = redisCommandArgv(conn->handle, NUM_ELEMENTS(script_load_argv), + script_load_argv, script_load_arg_len); + status = fr_redis_command_status(conn, reply); + } else if (redis_command_read_only(&status, &reply, request, conn, NUM_ELEMENTS(script_load_argv), + script_load_argv, script_load_arg_len) == -2) { + state.close_conn = true; + } + + if (status == REDIS_RCODE_SUCCESS) { + script_load_done = true; + + /* + * Verify we got a sane response + */ + if (reply->type != REDIS_REPLY_STRING) { + REDEBUG("Unexpected reply type after loading function"); + fr_redis_reply_print(L_DBG_LVL_OFF, reply, request, 0); + goto script_load_failed; + } + + if (strcmp(reply->str, func->digest) != 0) { + REDEBUG("Function digest %s, does not match calculated digest %s", reply->str, func->digest); + goto script_load_failed; + } + goto again; + } + } + } + + if (s_ret != REDIS_RCODE_SUCCESS) { + action = XLAT_ACTION_FAIL; + goto finish; + } + + if (!fr_cond_assert(reply)) { + action = XLAT_ACTION_FAIL; + goto finish; + } + + MEM(vb_out = fr_value_box_alloc_null(ctx)); + if (fr_redis_reply_to_value_box(ctx, vb_out, reply, FR_TYPE_VOID, NULL, false, false) < 0) { + RPERROR("Failed processing reply"); + return XLAT_ACTION_FAIL; + } + fr_dcursor_append(out, vb_out); + +finish: + fr_redis_reply_free(&reply); + + return action; +} + +/** Copies the function configuration into xlat function instance data + * + */ +static int redis_lua_func_instantiate(xlat_inst_ctx_t const *xctx) +{ + redis_lua_func_inst_t *inst = talloc_get_type_abort(xctx->inst, redis_lua_func_inst_t); + + inst->func = talloc_get_type_abort(xctx->uctx, redis_lua_func_t); + + return 0; +} + static xlat_arg_parser_t const redis_args[] = { { .required = true, .variadic = true, .concat = true, .type = FR_TYPE_STRING }, XLAT_ARG_PARSER_TERMINATOR @@ -344,7 +615,7 @@ static xlat_action_t redis_xlat(TALLOC_CTX *ctx, fr_dcursor_t *out, if (!read_only) { reply = redisCommandArgv(conn->handle, argc, argv, arg_len); status = fr_redis_command_status(conn, reply); - } else if (redis_command_read_only(&status, &reply, request, conn, argc, argv) == -2) { + } else if (redis_command_read_only(&status, &reply, request, conn, argc, argv, arg_len) == -2) { goto close_conn; } @@ -419,7 +690,7 @@ static xlat_action_t redis_xlat(TALLOC_CTX *ctx, fr_dcursor_t *out, if (!read_only) { reply = redisCommandArgv(conn->handle, argc, argv, arg_len); status = fr_redis_command_status(conn, reply); - } else if (redis_command_read_only(&status, &reply, request, conn, argc, argv) == -2) { + } else if (redis_command_read_only(&status, &reply, request, conn, argc, argv, arg_len) == -2) { state.close_conn = true; } } @@ -447,6 +718,89 @@ finish: return action; } +static int mod_instantiate(module_inst_ctx_t const *mctx) +{ + rlm_redis_t *inst = talloc_get_type_abort(mctx->inst->data, rlm_redis_t); + fr_socket_t *nodes; + int ret, i; + + inst->cluster = fr_redis_cluster_alloc(inst, mctx->inst->conf, &inst->conf, true, NULL, NULL, NULL); + if (!inst->cluster) return -1; + + /* + * Best effort - Try and load in scripts on startup + */ + if (talloc_array_length(inst->lua.funcs) == 0) return 0; + + ret = fr_redis_cluster_node_addr_by_role(NULL, &nodes, inst->cluster, true, true); + if (ret <= 0) return 0; + + for (i = 0; i < ret; i++) { + fr_pool_t *pool; + fr_redis_conn_t *conn; + + if (fr_redis_cluster_pool_by_node_addr(&pool, inst->cluster, &nodes[i], true) < 0) { + talloc_free(nodes); + return 0; + } + + conn = fr_pool_connection_get(pool, 0); + if (!conn) continue; + + talloc_foreach(inst->lua.funcs, func) { + char const *script_load_argv[] = { + "SCRIPT", + "LOAD", + func->body + }; + + size_t script_load_arg_len[] = { + (sizeof("SCRIPT") - 1), + (sizeof("LOAD") - 1), + (talloc_array_length(func->body) - 1) + }; + + fr_redis_rcode_t status; + redisReply *reply; + + /* + * preload onto every node, even replicas. + */ + if (redis_command_read_only(&status, &reply, NULL, conn, + NUM_ELEMENTS(script_load_argv), script_load_argv, script_load_arg_len) == -2) { + error: + fr_pool_connection_release(pool, NULL, conn); + talloc_free(nodes); + return -1; + } + + fr_redis_reply_free(&reply); + + /* + * Only error on explicit errors, not on connectivity issues + */ + switch (status) { + case REDIS_RCODE_ERROR: + PERROR("Loading lua function \"%s\" onto node failed", func->name); + goto error; + + case REDIS_RCODE_SUCCESS: + DEBUG2("Loaded lua function \"%s\" onto node", func->name); + break; + + default: + PWARN("Loading lua function \"%s\" onto node failed", func->name); + continue; + } + } + + fr_pool_connection_release(pool, NULL, conn); + } + talloc_free(nodes); + + return 0; +} + static int mod_bootstrap(module_inst_ctx_t const *mctx) { rlm_redis_t *inst = talloc_get_type_abort(mctx->inst->data, rlm_redis_t); @@ -460,24 +814,26 @@ static int mod_bootstrap(module_inst_ctx_t const *mctx) * %(redis_node:[ idx]) */ name = talloc_asprintf(NULL, "%s_node", mctx->inst->name); - xlat = xlat_func_register_module(inst, mctx, name, redis_node_xlat, FR_TYPE_STRING); + if (unlikely((xlat = xlat_func_register_module(inst, mctx, name, redis_node_xlat, FR_TYPE_STRING)) == NULL)) return -1; xlat_func_args_set(xlat, redis_node_xlat_args); talloc_free(name); name = talloc_asprintf(NULL, "%s_remap", mctx->inst->name); - xlat = xlat_func_register_module(inst, mctx, name, redis_remap_xlat, FR_TYPE_STRING); + if (unlikely((xlat = xlat_func_register_module(inst, mctx, name, redis_remap_xlat, FR_TYPE_STRING)) == NULL)) return -1; xlat_func_args_set(xlat, redis_remap_xlat_args); talloc_free(name); - return 0; -} - -static int mod_instantiate(module_inst_ctx_t const *mctx) -{ - rlm_redis_t *inst = talloc_get_type_abort(mctx->inst->data, rlm_redis_t); - - inst->cluster = fr_redis_cluster_alloc(inst, mctx->inst->conf, &inst->conf, true, NULL, NULL, NULL); - if (!inst->cluster) return -1; + /* + * Loop over the lua functions, registering an xlat + * that'll call that function specifically. + */ + talloc_foreach(inst->lua.funcs, func) { + name = talloc_asprintf(NULL, "%s.%s", mctx->inst->name, func->name); + if (unlikely((xlat = xlat_func_register_module(inst, mctx, name, redis_lua_func_xlat, FR_TYPE_VOID)) == NULL)) return -1; + xlat_func_args_set(xlat, redis_lua_func_args); + xlat_func_async_instantiate_set(xlat, redis_lua_func_instantiate, redis_lua_func_inst_t, NULL, func); + talloc_free(name); + } return 0; } diff --git a/src/tests/modules/redis/functions.attrs b/src/tests/modules/redis/functions.attrs new file mode 100644 index 0000000000..2376770b0b --- /dev/null +++ b/src/tests/modules/redis/functions.attrs @@ -0,0 +1,11 @@ +# +# Input packet +# +Packet-Type = Access-Request +User-Name = 'john' +User-Password = 'testing123' + +# +# Expected answer +# +Packet-Type == Access-Accept diff --git a/src/tests/modules/redis/functions.unlang b/src/tests/modules/redis/functions.unlang new file mode 100644 index 0000000000..7190dd5752 --- /dev/null +++ b/src/tests/modules/redis/functions.unlang @@ -0,0 +1,32 @@ +# +# Run the "redis" xlat +# +$INCLUDE cluster_reset.inc + +if (!("%(redis.hello_world:0)" == 'hello world')) { + test_fail +} + +# ...and again, now hopefully using the cached function +if (!(%(redis.hello_world:0) == 'hello world')) { + test_fail +} + +if (!(%(redis.concat_args_keys:1 foo bar baz) == 'foo,bar,baz')) { + test_fail +} + +if (!(%(redis.multiline:0 0) == 0)) { + test_fail +} + +if (!(%(redis.multiline:0 1) == 1)) { + test_fail +} + +# Bad call +if (%(redis.multiline:10) != '') { + test_fail +} + +test_pass diff --git a/src/tests/modules/redis/module.conf b/src/tests/modules/redis/module.conf index 8d5173c8b6..fe644b5ded 100644 --- a/src/tests/modules/redis/module.conf +++ b/src/tests/modules/redis/module.conf @@ -21,6 +21,24 @@ redis { # We recommend using a strong password. # password = thisisreallysecretandhardtoguess + lua { + function hello_world { + body = 'return "hello world"' + } + + function concat_args_keys { + body = "return table.concat(KEYS, ',') .. ',' .. table.concat(ARGV, ',')" + } + + function multiline { + body = "if ARGV[1] == '0' then\ + return 0\ + else\ + return 1\ + end" + } + } + # # Information for the connection pool. The configuration items # below are the same for all modules which use the new -- 2.47.2