struct rspamd_lua_upstream {
struct upstream *up;
int upref;
+ /*
+ * Inflight bookkeeping for the C-side P2C load comparator. acquired is
+ * set when this wrapper holds the inflight reference produced by a
+ * get_* call (round-robin / hashed / master-slave). retired is set the
+ * first time :ok or :fail is called. If acquired && !retired at __gc
+ * time, the destructor calls rspamd_upstream_release so abandoned
+ * selections don't permanently skew P2C scoring. Wrappers handed out
+ * by all_upstreams() or watch callbacks set acquired = FALSE and the
+ * destructor leaves inflight alone.
+ */
+ gboolean acquired;
+ gboolean retired;
};
/* Common utility functions */
}
rspamd_upstream_fail(up->up, fail_addr, reason);
+ up->retired = TRUE;
}
return 0;
if (up) {
rspamd_upstream_ok(up->up);
+ up->retired = TRUE;
}
return 0;
struct rspamd_lua_upstream *up = lua_check_upstream(L, 1);
if (up) {
+ /*
+ * Lua callers can forget to pair :get_upstream_*() with :ok()/:fail().
+ * Retire the inflight reference here so abandoned selections don't
+ * permanently skew P2C scoring. Wrappers from all_upstreams() and
+ * watch callbacks set acquired = FALSE and are skipped.
+ */
+ if (up->acquired && !up->retired) {
+ rspamd_upstream_release(up->up);
+ }
/* Remove reference to the parent */
luaL_unref(L, LUA_REGISTRYINDEX, up->upref);
/* Upstream belongs to the upstream list, so no free here */
}
static struct rspamd_lua_upstream *
-lua_push_upstream(lua_State *L, int up_idx, struct upstream *up)
+lua_push_upstream(lua_State *L, int up_idx, struct upstream *up,
+ gboolean acquired)
{
struct rspamd_lua_upstream *lua_ups;
lua_ups = lua_newuserdata(L, sizeof(*lua_ups));
lua_ups->up = up;
+ lua_ups->acquired = acquired;
+ lua_ups->retired = FALSE;
rspamd_lua_setclass(L, rspamd_upstream_classname, -1);
/* Store parent in the upstream to prevent gc */
lua_pushvalue(L, up_idx);
(unsigned int) keyl);
if (selected) {
- lua_push_upstream(L, 1, selected);
+ lua_push_upstream(L, 1, selected, TRUE);
}
else {
lua_pushnil(L);
selected = rspamd_upstream_get(upl, RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
if (selected) {
- lua_push_upstream(L, 1, selected);
+ lua_push_upstream(L, 1, selected, TRUE);
}
else {
lua_pushnil(L);
NULL,
0);
if (selected) {
- lua_push_upstream(L, 1, selected);
+ lua_push_upstream(L, 1, selected, TRUE);
}
else {
lua_pushnil(L);
{
struct upstream_foreach_cbdata *cbd = (struct upstream_foreach_cbdata *) ud;
- lua_push_upstream(cbd->L, cbd->ups_pos, up);
+ /* all_upstreams() is a pure view; no inflight to retire on __gc. */
+ lua_push_upstream(cbd->L, cbd->ups_pos, up, FALSE);
lua_rawseti(cbd->L, -2, idx + 1);
}
/***
struct rspamd_lua_upstream *lua_ups = lua_newuserdata(L, sizeof(*lua_ups));
lua_ups->up = up;
+ /* Watcher event: no inflight reference, leave it that way on __gc. */
+ lua_ups->acquired = FALSE;
+ lua_ups->retired = FALSE;
rspamd_lua_setclass(L, rspamd_upstream_classname, -1);
/* Store parent in the upstream to prevent gc */
lua_rawgeti(L, LUA_REGISTRYINDEX, cdata->parent_cbref);
--- /dev/null
+-- Upstream list / upstream object tests
+
+context("Upstream lua API", function()
+ local upstream_list = require "rspamd_upstream_list"
+
+ test("create from comma-separated string", function()
+ local ups = upstream_list.create('127.0.0.1,127.0.0.2,127.0.0.3', 11333)
+ assert_not_nil(ups)
+ local all = ups:all_upstreams()
+ assert_equal(#all, 3)
+ end)
+
+ test("get_upstream_round_robin returns a usable upstream", function()
+ local ups = upstream_list.create('127.0.0.1,127.0.0.2', 11333)
+ assert_not_nil(ups)
+ local up = ups:get_upstream_round_robin()
+ assert_not_nil(up)
+ assert_not_nil(up:get_name())
+ assert_not_nil(up:get_port())
+ up:ok()
+ end)
+
+ test("get_upstream_by_hash with the same key is stable", function()
+ local ups = upstream_list.create('127.0.0.1,127.0.0.2,127.0.0.3', 11333)
+ local first = ups:get_upstream_by_hash('hello')
+ local second = ups:get_upstream_by_hash('hello')
+ assert_not_nil(first)
+ assert_not_nil(second)
+ assert_equal(first:get_name(), second:get_name())
+ first:ok()
+ second:ok()
+ end)
+
+ test("dropping a wrapper without ok/fail does not crash", function()
+ -- Smoke test for the __gc retire-on-drop fallback. Repeatedly acquire
+ -- and immediately abandon wrappers; the destructor must release the
+ -- inflight reference without blowing up. We then verify we can still
+ -- get usable wrappers from the same list.
+ local ups = upstream_list.create('127.0.0.1,127.0.0.2,127.0.0.3', 11333)
+ for _ = 1, 50 do
+ local up = ups:get_upstream_round_robin()
+ assert_not_nil(up)
+ up = nil
+ end
+ collectgarbage('collect')
+
+ local survivor = ups:get_upstream_round_robin()
+ assert_not_nil(survivor)
+ survivor:ok()
+ end)
+
+ test("all_upstreams() wrappers are safe to drop", function()
+ -- all_upstreams() is a view, not an acquisition: dropping the table
+ -- must not retire any inflight reference (there is none to retire).
+ local ups = upstream_list.create('127.0.0.1,127.0.0.2', 11333)
+ for _ = 1, 20 do
+ local all = ups:all_upstreams()
+ assert_equal(#all, 2)
+ all = nil
+ end
+ collectgarbage('collect')
+
+ -- Subsequent operations still work.
+ local up = ups:get_upstream_round_robin()
+ assert_not_nil(up)
+ up:ok()
+ end)
+
+ test("calling :ok then :fail on the same wrapper is safe", function()
+ -- The retired flag prevents the __gc from also retiring; explicit
+ -- pairs of ok/fail still drive the underlying upstream, but the
+ -- per-wrapper inflight is decremented only once.
+ local ups = upstream_list.create('127.0.0.1', 11333)
+ local up = ups:get_upstream_round_robin()
+ assert_not_nil(up)
+ up:ok()
+ up:fail('test')
+ up:ok()
+ up = nil
+ collectgarbage('collect')
+ end)
+end)