+++ /dev/null
-DESTDIR =
-PREFIX = /usr/local
-BINDIR = $(PREFIX)/bin
-
-CC = gcc
-LD = $(CC)
-
-CFLAGS = -g -O2 -Wall -Werror -pthread
-LDFLAGS = -lpthread
-
-OBJS = spoa.o
-
-ifneq ($(USE_LUA),)
-OBJS += ps_lua.o
-ifneq ($(LUA_INC),)
-CFLAGS += -I$(LUA_INC)
-endif
-ifneq ($(LUA_LIB),)
-LDLIBS += -L$(LUA_LIB)
-endif
-LDLIBS += -ldl -Wl,--export-dynamic -llua -lm -Wl,--no-export-dynamic
-endif
-
-ifneq ($(USE_PYTHON),)
-OBJS += ps_python.o
-
-# "--embed" flag is supported (and required) only from python 3.8+
-check_python_config := $(shell if python3-config --embed > /dev/null 2>&1 ; then echo "python3.8+"; \
-elif hash python3-config > /dev/null 2>&1 ; then echo "python3"; \
-elif hash python-config > /dev/null 2>&1 ; then echo "python2"; fi)
-
-ifeq ($(check_python_config), python3.8+)
-PYTHON_DEFAULT_INC := $(shell python3-config --includes)
-PYTHON_DEFAULT_LIB := $(shell python3-config --libs --embed)
-else ifeq ($(check_python_config), python3)
-PYTHON_DEFAULT_INC := $(shell python3-config --includes)
-PYTHON_DEFAULT_LIB := $(shell python3-config --libs)
-else ifeq ($(check_python_config), python2)
-PYTHON_DEFAULT_INC := $(shell python-config --includes)
-PYTHON_DEFAULT_LIB := $(shell python-config --libs)
-endif
-
-
-# Add default path
-ifneq ($(PYTHON_DEFAULT_INC),)
-CFLAGS += $(PYTHON_DEFAULT_INC)
-else
-CFLAGS += -I/usr/include/python2.7
-endif
-ifneq ($(PYTHON_DEFAULT_LIB),)
-LDLIBS += $(PYTHON_DEFAULT_LIB)
-else
-LDLIBS += -lpython2.7
-endif
-
-# Add user additional paths if any
-ifneq ($(PYTHON_INC),)
-CFLAGS += -I$(PYTHON_INC)
-endif
-ifneq ($(PYTHON_LIB),)
-LDLIBS += -L$(PYTHON_LIB)
-endif
-
-LDLIBS +=-Wl,--export-dynamic
-endif
-
-spoa: $(OBJS)
- $(LD) $(LDFLAGS) -o $@ $^ $(LDLIBS)
-
-install: spoa
- install spoa $(DESTDIR)$(BINDIR)
-
-clean:
- rm -f spoa $(OBJS)
-
-%.o: %.c
- $(CC) $(CFLAGS) -c -o $@ $<
+++ /dev/null
-Multi script langyage Stream Processing Offload Agent
------------------------------------------------------
-
-This agent receive SPOP message and process it with script languages. The
-language register callback with a message. Each callback receive the list
-of arguments with types according with the language capabilities. The
-callback write variables which are sent as response when the processing
-is done.
-
-
- Prerequirement
-----------------
-
-You have to install the development packages, either from the
-distribution repositories or from the source.
-
-CentOS/RHEL: sudo yum install python3-devel
-
-The current minimal python version compatible with this library is 2.7.
-It's recommended to use python version 3 where possible due to python 2 deprecation.
-
-
- Compilation
----------------
-
-The server currently supports Lua and Python. Type "make" with the options:
-USE_LUA=1 and/or USE_PYTHON=1.
-
-You can add LUA_INC=.. LUA_LIB=.. to the make command to set the paths to
-the lua header files and lua libraries.
-
-Similarly, you can add PYTHON_INC=.. PYTHON_LIB=.. to the make command to set the paths to
-the python header files and python libraries.
-By default, it will try to compile by detecting the default python 3 parameters.
-It will fall back to python 2 if python 3 is not available.
-
- Start the service
----------------------
-
-After you have compiled it, to start the service, you just need to use "spoa"
-binary:
-
- $> ./spoa -h
- Usage: ./spoa [-h] [-d] [-p <port>] [-n <num-workers>]
- -h Print this message
- -d Enable the debug mode
- -p <port> Specify the port to listen on (default: 12345)
- -n <num-workers> Specify the number of workers (default: 5)
- -f <file> Load script according with the supported languages
-
-The file processor is recognized using the extension. .lua or .luac for lua and
-.py for python. Start example:
-
- $> ./spoa -d -f ps_lua.lua
-
- $> ./spoa -d -f ps_python.py
-
-
- Configure
--------------
-
-Sample configuration are join to this server:
-
- spoa-server.conf : The HAProxy configuration file using SPOE server
- spoa-server.spoe.conf : The SPOP description file used by HAProxy
- ps_lua.lua : Processing Lua example
- ps_python.py : Processing Python example
-
-
- Considerations
-------------------
-
-This server is a beta version. It works fine, but some improvement will be
-welcome:
-
-Main process:
-
- * Improve log management: Today the log are sent on stdout.
- * Improve process management: The dead process are ignored.
- * Implement systemd integration.
- * Implement threads: It would be fine to implement thread working. Shared
- memory is welcome for managing database connection pool and something like
- that.
- * Add PHP support and some other languages.
-
-Python:
-
- * Improve reporting: Catch python error message and report it in the right
- place. Today the error are dumped on stdout. How using syslog for logging
- stack traces ?
-
-Maybe some other things...
+++ /dev/null
-function color(index, str)
- return "\x1b[" .. index .. "m" .. str .. "\x1b[00m"
-end
-
-function nocolor(index, str)
- return str
-end
-
-function sp(count)
- local spaces = ""
- while count > 0 do
- spaces = spaces .. " "
- count = count - 1
- end
- return spaces
-end
-
-function print_rr(p, indent, c, wr)
- local i = 0
- local nl = ""
-
- if type(p) == "table" then
- wr(c("33", "(table)") .. " " .. c("34", tostring(p)) .. " [")
-
- mt = getmetatable(p)
- if mt ~= nil then
- wr("\n" .. sp(indent+1) .. c("31", "METATABLE") .. ": ")
- print_rr(mt, indent+1, c, wr)
- end
-
- for k,v in pairs(p) do
- if i > 0 then
- nl = "\n"
- else
- wr("\n")
- end
- wr(nl .. sp(indent+1))
- if type(k) == "number" then
- wr(c("32", tostring(k)))
- else
- wr("\"" .. c("32", tostring(k)) .. "\"")
- end
- wr(": ")
- print_rr(v, indent+1, c, wr)
- i = i + 1
- end
- if i == 0 then
- wr(" " .. c("35", "/* empty */") .. " ]")
- else
- wr("\n" .. sp(indent) .. "]")
- end
- elseif type(p) == "string" then
- wr(c("33", "(string)") .. " \"" .. c("34", p) .. "\"")
- else
- wr(c("33", "(" .. type(p) .. ")") .. " " .. c("34", tostring(p)))
- end
-end
-
-function print_r(p, col, wr)
- if col == nil then col = true end
- if wr == nil then wr = function(msg) io.stdout:write(msg) end end
- if col == true then
- print_rr(p, 0, color, wr)
- else
- print_rr(p, 0, nocolor, wr)
- end
- wr("\n")
-end
+++ /dev/null
-/* spoa-server: processing Lua
- *
- * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- *
- */
-
-#include <arpa/inet.h>
-
-#include <errno.h>
-#include <string.h>
-
-#include <lauxlib.h>
-#include <lua.h>
-#include <lualib.h>
-
-#include "spoa.h"
-
-static lua_State *L = NULL;
-static struct worker *worker;
-
-static int ps_lua_start_worker(struct worker *w);
-static int ps_lua_load_file(struct worker *w, const char *file);
-static int ps_lua_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
-
-static struct ps ps_lua_bindings_1 = {
- .init_worker = ps_lua_start_worker,
- .load_file = ps_lua_load_file,
- .exec_message = ps_lua_exec_message,
- .ext = ".lua",
-};
-
-static struct ps ps_lua_bindings_2 = {
- .init_worker = ps_lua_start_worker,
- .load_file = ps_lua_load_file,
- .exec_message = ps_lua_exec_message,
- .ext = ".luac",
-};
-
-/* Imported from Lua-5.3.4 */
-static int typeerror (lua_State *L, int arg, const char *tname)
-{
- const char *msg;
- const char *typearg; /* name for the type of the actual argument */
- if (luaL_getmetafield(L, arg, "__name") == LUA_TSTRING)
- typearg = lua_tostring(L, -1); /* use the given type name */
- else if (lua_type(L, arg) == LUA_TLIGHTUSERDATA)
- typearg = "light userdata"; /* special name for messages */
- else
- typearg = luaL_typename(L, arg); /* standard name */
- msg = lua_pushfstring(L, "%s expected, got %s", tname, typearg);
- return luaL_argerror(L, arg, msg);
-}
-
-/* Imported from Lua-5.3.4 */
-static void tag_error (lua_State *L, int arg, int tag) {
- typeerror(L, arg, lua_typename(L, tag));
-}
-
-#ifndef luaL_checkboolean
-static int luaL_checkboolean(lua_State *L, int index)
-{
- if (!lua_isboolean(L, index)) {
- tag_error(L, index, LUA_TBOOLEAN);
- }
- return lua_toboolean(L, index);
-}
-#endif
-
-static int ps_lua_register_message(lua_State *L)
-{
- const char *name;
- long ref;
-
- /* First argument is a message name */
- name = luaL_checkstring(L, 1);
-
- /* Second argument is a function */
- if (!lua_isfunction(L, 2)) {
- const char *msg = lua_pushfstring(L, "function expected, got %s", luaL_typename(L, 2));
- luaL_argerror(L, 2, msg);
- }
- lua_pushvalue(L, 2);
- ref = luaL_ref(L, LUA_REGISTRYINDEX);
-
- /* Register the message processor */
- ps_register_message(&ps_lua_bindings_1, name, (void *)ref);
-
- return 1;
-}
-
-static int ps_lua_set_var_null(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
-
- if (!set_var_null(worker, name, name_len, scope)) {
- luaL_error(L, "No space left available");
- }
- return 0;
-}
-
-static int ps_lua_set_var_boolean(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- int64_t value;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkboolean(L, 3);
-
- if (!set_var_bool(worker, name, name_len, scope, value))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_uint32(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- int64_t value;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkinteger(L, 3);
-
- if (value < 0 || value > UINT_MAX)
- luaL_error(L, "Integer '%lld' out of range for 'uint32' type", value);
-
- if (!set_var_uint32(worker, name, name_len, scope, value))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_int32(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- int64_t value;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkinteger(L, 3);
-
- if (value < INT_MIN || value > INT_MAX)
- luaL_error(L, "Integer '%lld' out of range for 'int32' type", value);
-
- if (!set_var_int32(worker, name, name_len, scope, value))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_uint64(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- int64_t value;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkinteger(L, 3);
-
- if (value < 0)
- luaL_error(L, "Integer '%lld' out of range for 'uint64' type", value);
-
- if (!set_var_uint64(worker, name, name_len, scope, value))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_int64(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- int64_t value;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkinteger(L, 3);
-
- if (!set_var_int64(worker, name, name_len, scope, value))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_ipv4(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- const char *value;
- struct in_addr ipv4;
- int ret;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkstring(L, 3);
-
- ret = inet_pton(AF_INET, value, &ipv4);
- if (ret == 0)
- luaL_error(L, "IPv4 '%s': invalid format", value);
- if (ret == -1)
- luaL_error(L, "IPv4 '%s': %s", value, strerror(errno));
-
- if (!set_var_ipv4(worker, name, name_len, scope, &ipv4))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_ipv6(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- const char *value;
- struct in6_addr ipv6;
- int ret;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checkstring(L, 3);
-
- ret = inet_pton(AF_INET6, value, &ipv6);
- if (ret == 0)
- luaL_error(L, "IPv6 '%s': invalid format", value);
- if (ret == -1)
- luaL_error(L, "IPv6 '%s': %s", value, strerror(errno));
-
- if (!set_var_ipv6(worker, name, name_len, scope, &ipv6))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_str(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- const char *value;
- size_t value_len;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checklstring(L, 3, &value_len);
-
- if (!set_var_string(worker, name, name_len, scope, value, value_len))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_set_var_bin(lua_State *L)
-{
- const char *name;
- size_t name_len;
- unsigned char scope;
- const char *value;
- size_t value_len;
-
- name = luaL_checklstring(L, 1, &name_len);
- scope = (unsigned char)luaL_checkinteger(L, 2);
- value = luaL_checklstring(L, 3, &value_len);
-
- if (!set_var_bin(worker, name, name_len, scope, value, value_len))
- luaL_error(L, "No space left available");
- return 0;
-}
-
-static int ps_lua_start_worker(struct worker *w)
-{
- if (L != NULL)
- return 1;
-
- worker = w;
-
- L = luaL_newstate();
- luaL_openlibs(L);
-
- lua_newtable(L);
-
- lua_pushstring(L, "register_message");
- lua_pushcclosure(L, ps_lua_register_message, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_null");
- lua_pushcclosure(L, ps_lua_set_var_null, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_boolean");
- lua_pushcclosure(L, ps_lua_set_var_boolean, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_uint32");
- lua_pushcclosure(L, ps_lua_set_var_uint32, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_int32");
- lua_pushcclosure(L, ps_lua_set_var_int32, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_uint64");
- lua_pushcclosure(L, ps_lua_set_var_uint64, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_int64");
- lua_pushcclosure(L, ps_lua_set_var_int64, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_ipv4");
- lua_pushcclosure(L, ps_lua_set_var_ipv4, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_ipv6");
- lua_pushcclosure(L, ps_lua_set_var_ipv6, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_str");
- lua_pushcclosure(L, ps_lua_set_var_str, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "set_var_bin");
- lua_pushcclosure(L, ps_lua_set_var_bin, 0);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "scope");
- lua_newtable(L);
-
- lua_pushstring(L, "proc");
- lua_pushinteger(L, SPOE_SCOPE_PROC);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "sess");
- lua_pushinteger(L, SPOE_SCOPE_SESS);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "txn");
- lua_pushinteger(L, SPOE_SCOPE_TXN);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "req");
- lua_pushinteger(L, SPOE_SCOPE_REQ);
- lua_rawset(L, -3);
-
- lua_pushstring(L, "res");
- lua_pushinteger(L, SPOE_SCOPE_RES);
- lua_rawset(L, -3);
-
- lua_rawset(L, -3); /* scope */
-
- lua_setglobal(L, "spoa");
- return 1;
-}
-
-static int ps_lua_load_file(struct worker *w, const char *file)
-{
- int error;
-
- /* Load the file and check syntax */
- error = luaL_loadfile(L, file);
- if (error) {
- fprintf(stderr, "lua syntax error: %s\n", lua_tostring(L, -1));
- return 0;
- }
-
- /* If no syntax error where detected, execute the code. */
- error = lua_pcall(L, 0, LUA_MULTRET, 0);
- switch (error) {
- case LUA_OK:
- break;
- case LUA_ERRRUN:
- fprintf(stderr, "lua runtime error: %s\n", lua_tostring(L, -1));
- lua_pop(L, 1);
- return 0;
- case LUA_ERRMEM:
- fprintf(stderr, "lua out of memory error\n");
- return 0;
- case LUA_ERRERR:
- fprintf(stderr, "lua message handler error: %s\n", lua_tostring(L, 0));
- lua_pop(L, 1);
- return 0;
- case LUA_ERRGCMM:
- fprintf(stderr, "lua garbage collector error: %s\n", lua_tostring(L, 0));
- lua_pop(L, 1);
- return 0;
- default:
- fprintf(stderr, "lua unknown error: %s\n", lua_tostring(L, 0));
- lua_pop(L, 1);
- return 0;
- }
- return 1;
-}
-
-static int ps_lua_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args)
-{
- long lua_ref = (long)ref;
- int ret;
- char *msg_fmt = NULL;
- const char *msg;
- int i;
- char ipbuf[64];
-
- /* Restore function in the stack */
- lua_rawgeti(L, LUA_REGISTRYINDEX, lua_ref);
-
- /* convert args in lua mode */
- lua_newtable(L);
- for (i = 0; i < nargs; i++) {
- lua_newtable(L);
- lua_pushstring(L, "name");
- lua_pushlstring(L, args[i].name.str, args[i].name.len);
- lua_rawset(L, -3); /* Push name */
- lua_pushstring(L, "value");
- switch (args[i].value.type) {
- case SPOE_DATA_T_NULL:
- lua_pushnil(L);
- break;
- case SPOE_DATA_T_BOOL:
- lua_pushboolean(L, args[i].value.u.boolean);
- break;
- case SPOE_DATA_T_INT32:
- lua_pushinteger(L, args[i].value.u.sint32);
- break;
- case SPOE_DATA_T_UINT32:
- lua_pushinteger(L, args[i].value.u.uint32);
- break;
- case SPOE_DATA_T_INT64:
- lua_pushinteger(L, args[i].value.u.sint64);
- break;
- case SPOE_DATA_T_UINT64:
- if (args[i].value.u.uint64 > LLONG_MAX)
- lua_pushnil(L);
- else
- lua_pushinteger(L, args[i].value.u.uint64);
- break;
- case SPOE_DATA_T_IPV4:
- if (inet_ntop(AF_INET, &args[i].value.u.ipv4, ipbuf, 64) == NULL)
- lua_pushnil(L);
- else
- lua_pushstring(L, ipbuf);
- break;
- case SPOE_DATA_T_IPV6:
- if (inet_ntop(AF_INET6, &args[i].value.u.ipv4, ipbuf, 64) == NULL)
- lua_pushnil(L);
- else
- lua_pushstring(L, ipbuf);
- break;
- case SPOE_DATA_T_STR:
- case SPOE_DATA_T_BIN:
- lua_pushlstring(L, args[i].value.u.buffer.str, args[i].value.u.buffer.len);
- break;
- default:
- lua_pushnil(L);
- break;
- }
- lua_rawset(L, -3); /* Push name */
- lua_rawseti(L, -2, i + 1); /* Pusg table in globale table */
- }
-
- /* execute lua function */
- while (1) {
- ret = lua_resume(L, L, 1);
- switch (ret) {
- case LUA_OK:
- return 1;
- case LUA_YIELD:
- DEBUG("Lua yield");
- continue;
- case LUA_ERRMEM:
- LOG("Lua: Out of memory error");
- return 0;
- case LUA_ERRRUN:
- msg_fmt = "Lua runtime error";
- case LUA_ERRGCMM:
- msg_fmt = msg_fmt ? msg_fmt : "Lua garbage collector error";
- case LUA_ERRERR:
- msg_fmt = msg_fmt ? msg_fmt : "Lua message handler error";
- default:
- msg_fmt = msg_fmt ? msg_fmt : "Lua unknown error";
- msg = lua_tostring(L, -1);
- if (msg == NULL)
- msg = "Unknown error";
- LOG("%s: %s", msg_fmt, msg);
- lua_settop(L, 0);
- return 0;
- }
- }
-
- return 1;
-}
-
-__attribute__((constructor))
-static void __ps_lua_init(void)
-{
- ps_register(&ps_lua_bindings_1);
- ps_register(&ps_lua_bindings_2);
-}
+++ /dev/null
-require("print_r")
-require("math")
-
-print_r("Load lua message processors")
-
-spoa.register_message("check-client-ip", function(args)
- print_r(args)
- spoa.set_var_null("null", spoa.scope.txn)
- spoa.set_var_boolean("boolean", spoa.scope.txn, true)
- spoa.set_var_int32("int32", spoa.scope.txn, 1234)
- spoa.set_var_uint32("uint32", spoa.scope.txn, 1234)
- spoa.set_var_int64("int64", spoa.scope.txn, 1234)
- spoa.set_var_uint64("uint64", spoa.scope.txn, 1234)
- spoa.set_var_ipv4("ipv4", spoa.scope.txn, "127.0.0.1")
- spoa.set_var_ipv6("ipv6", spoa.scope.txn, "1::f")
- spoa.set_var_str("str", spoa.scope.txn, "1::f")
- spoa.set_var_bin("bin", spoa.scope.txn, "1::f")
- spoa.set_var_int32("ip_score", spoa.scope.sess, math.random(100))
-end)
+++ /dev/null
-/* spoa-server: processing Python
- *
- * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
- * Copyright (C) 2020 Gilchrist Dadaglo <gilchrist@dadaglo.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- *
- * This program is provided in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- */
-
-/*
- * Define PY_SSIZE_T_CLEAN before including Python.h
- * as per https://docs.python.org/3/c-api/arg.html and https://docs.python.org/2/c-api/arg.html
- */
-#define PY_SSIZE_T_CLEAN
-
-#include <Python.h>
-
-#include <arpa/inet.h>
-
-#include <errno.h>
-#include <string.h>
-#include <limits.h>
-
-#include "spoa.h"
-#include "ps_python.h"
-
-/* Embedding python documentation:
- *
- * https://docs.python.org/2/extending/embedding.html
- * https://docs.python.org/2/extending/extending.html#extending-python-with-c-or-c
- * https://docs.python.org/2/extending/extending.html#calling-python-functions-from-c
- */
-
-static PyObject *module_ipaddress;
-static PyObject *ipv4_address;
-static PyObject *ipv6_address;
-static PyObject *spoa_error;
-static PyObject *empty_tuple;
-static struct worker *worker;
-
-static int ps_python_start_worker(struct worker *w);
-static int ps_python_load_file(struct worker *w, const char *file);
-static int ps_python_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
-
-static struct ps ps_python_bindings = {
- .init_worker = ps_python_start_worker,
- .load_file = ps_python_load_file,
- .exec_message = ps_python_exec_message,
- .ext = ".py",
-};
-
-static int ps_python_check_overflow(Py_ssize_t len)
-{
- /* There might be an overflow when converting from Py_ssize_t to int.
- * This function will catch those cases.
- * Also, spoa "struct chunk" is limited to int size.
- * We should not send data bigger than it can handle.
- */
- if (len >= (Py_ssize_t)INT_MAX) {
- PyErr_Format(spoa_error,
- "%zd is over 2GB. Please split in smaller pieces.", \
- len);
- return -1;
- } else {
- return Py_SAFE_DOWNCAST(len, Py_ssize_t, int);
- }
-}
-
-#if IS_PYTHON_3K
-static PyObject *module_spoa;
-static PyObject *PyInit_spoa_module(void);
-#endif /* IS_PYTHON_3K */
-
-static PyObject *ps_python_register_message(PyObject *self, PyObject *args)
-{
- const char *name;
- PyObject *ref;
-
- if (!PyArg_ParseTuple(args, "sO!", &name, &PyFunction_Type, &ref))
- return NULL;
- Py_XINCREF(ref); /* because the function is internally referenced */
-
- ps_register_message(&ps_python_bindings, name, (void *)ref);
-
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_null(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int name_len_i;
- int scope;
-
- if (!PyArg_ParseTuple(args, "s#i", &name, &name_len, &scope))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!set_var_null(worker, name, name_len_i, scope)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_boolean(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- int value;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#ii", &name, &name_len, &scope, &value))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!set_var_bool(worker, name, name_len_i, scope, value)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_int32(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- int32_t value;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#ii", &name, &name_len, &scope, &value))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!set_var_int32(worker, name, name_len_i, scope, value)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_uint32(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- uint32_t value;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#iI", &name, &name_len, &scope, &value))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!set_var_uint32(worker, name, name_len_i, scope, value)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_int64(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- int64_t value;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#il", &name, &name_len, &scope, &value))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!set_var_int64(worker, name, name_len_i, scope, value)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_uint64(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- uint64_t value;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#ik", &name, &name_len, &scope, &value))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!set_var_uint64(worker, name, name_len_i, scope, value)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_ipv4(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- PyObject *ipv4;
- PyObject *value;
- struct in_addr ip;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#iO", &name, &name_len, &scope, &ipv4))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!PyObject_IsInstance(ipv4, ipv4_address)) {
- PyErr_Format(spoa_error, "must be 'IPv4Address', not '%s'", ipv4->ob_type->tp_name);
- return NULL;
- }
- /* Execute packed ... I think .. */
- value = PyObject_GetAttrString(ipv4, "packed");
- if (value == NULL)
- return NULL;
- if (PY_STRING_GET_SIZE(value) != sizeof(ip)) {
- PyErr_Format(spoa_error, "IPv4 manipulation internal error");
- return NULL;
- }
- memcpy(&ip, PY_STRING_AS_STRING(value), PY_STRING_GET_SIZE(value));
- if (!set_var_ipv4(worker, name, name_len_i, scope, &ip)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- /* Once we set the IP value in the worker, we don't need it anymore... */
- Py_XDECREF(value);
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_ipv6(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- PyObject *ipv6;
- PyObject *value;
- struct in6_addr ip;
- int name_len_i;
-
- if (!PyArg_ParseTuple(args, "s#iO", &name, &name_len, &scope, &ipv6))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- if (name_len_i == -1)
- return NULL;
- if (!PyObject_IsInstance(ipv6, ipv6_address)) {
- PyErr_Format(spoa_error, "must be 'IPv6Address', not '%s'", ipv6->ob_type->tp_name);
- return NULL;
- }
- /* Execute packed ... I think .. */
- value = PyObject_GetAttrString(ipv6, "packed");
- if (value == NULL)
- return NULL;
- if (PY_STRING_GET_SIZE(value) != sizeof(ip)) {
- PyErr_Format(spoa_error, "IPv6 manipulation internal error");
- return NULL;
- }
- memcpy(&ip, PY_STRING_AS_STRING(value), PY_STRING_GET_SIZE(value));
- if (!set_var_ipv6(worker, name, name_len_i, scope, &ip)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- /* Once we set the IP value in the worker, we don't need it anymore... */
- Py_XDECREF(value);
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_str(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- const char *value;
- Py_ssize_t value_len;
- int name_len_i;
- int value_len_i;
-
- if (!PyArg_ParseTuple(args, "s#is#", &name, &name_len, &scope, &value, &value_len))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- value_len_i = ps_python_check_overflow(value_len);
- if (name_len_i == -1 || value_len_i == -1)
- return NULL;
- if (!set_var_string(worker, name, name_len_i, scope, value, value_len_i)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-static PyObject *ps_python_set_var_bin(PyObject *self, PyObject *args)
-{
- const char *name;
- Py_ssize_t name_len;
- int scope;
- const char *value;
- Py_ssize_t value_len;
- int name_len_i;
- int value_len_i;
-
- if (!PyArg_ParseTuple(args, "s#is#", &name, &name_len, &scope, &value, &value_len))
- return NULL;
- name_len_i = ps_python_check_overflow(name_len);
- value_len_i = ps_python_check_overflow(value_len);
- if (name_len_i == -1 || value_len_i == -1)
- return NULL;
- if (!set_var_bin(worker, name, name_len_i, scope, value, value_len_i)) {
- PyErr_SetString(spoa_error, "No more memory space available");
- return NULL;
- }
- Py_RETURN_NONE;
-}
-
-
-static PyMethodDef spoa_methods[] = {
- {"register_message", ps_python_register_message, METH_VARARGS,
- "Register binding for SPOA message."},
- {"set_var_null", ps_python_set_var_null, METH_VARARGS,
- "Set SPOA NULL variable"},
- {"set_var_boolean", ps_python_set_var_boolean, METH_VARARGS,
- "Set SPOA boolean variable"},
- {"set_var_int32", ps_python_set_var_int32, METH_VARARGS,
- "Set SPOA int32 variable"},
- {"set_var_uint32", ps_python_set_var_uint32, METH_VARARGS,
- "Set SPOA uint32 variable"},
- {"set_var_int64", ps_python_set_var_int64, METH_VARARGS,
- "Set SPOA int64 variable"},
- {"set_var_uint64", ps_python_set_var_uint64, METH_VARARGS,
- "Set SPOA uint64 variable"},
- {"set_var_ipv4", ps_python_set_var_ipv4, METH_VARARGS,
- "Set SPOA ipv4 variable"},
- {"set_var_ipv6", ps_python_set_var_ipv6, METH_VARARGS,
- "Set SPOA ipv6 variable"},
- {"set_var_str", ps_python_set_var_str, METH_VARARGS,
- "Set SPOA str variable"},
- {"set_var_bin", ps_python_set_var_bin, METH_VARARGS,
- "Set SPOA bin variable"},
- { /* end */ }
-};
-
-#if IS_PYTHON_3K
-static struct PyModuleDef spoa_module_definition = {
- PyModuleDef_HEAD_INIT, /* m_base */
- "spoa", /* m_name */
- "HAProxy SPOA module for python", /* m_doc */
- -1, /* m_size */
- spoa_methods, /* m_methods */
- NULL, /* m_slots */
- NULL, /* m_traverse */
- NULL, /* m_clear */
- NULL /* m_free */
-};
-
-static PyObject *PyInit_spoa_module(void)
-{
- return module_spoa;
-}
-#endif /* IS_PYTHON_3K */
-
-static int ps_python_start_worker(struct worker *w)
-{
- PyObject *m;
- PyObject *module_name;
- PyObject *value;
- int ret;
-
-#if IS_PYTHON_27
- Py_SetProgramName("spoa-server");
-#endif /* IS_PYTHON_27 */
-#if IS_PYTHON_3K
- Py_SetProgramName(Py_DecodeLocale("spoa-server", NULL));
- PyImport_AppendInittab("spoa", &PyInit_spoa_module);
-#endif /* IS_PYTHON_3K */
-
- Py_Initialize();
-
- module_name = PY_STRING_FROM_STRING("ipaddress");
- if (module_name == NULL) {
- PyErr_Print();
- return 0;
- }
-
- module_ipaddress = PyImport_Import(module_name);
- Py_DECREF(module_name);
- if (module_ipaddress == NULL) {
- PyErr_Print();
- return 0;
- }
-
- ipv4_address = PyObject_GetAttrString(module_ipaddress, "IPv4Address");
- if (ipv4_address == NULL) {
- Py_DECREF(module_ipaddress);
- PyErr_Print();
- return 0;
- }
-
- ipv6_address = PyObject_GetAttrString(module_ipaddress, "IPv6Address");
- if (ipv6_address == NULL) {
- Py_DECREF(ipv4_address);
- Py_DECREF(module_ipaddress);
- PyErr_Print();
- return 0;
- }
-
- PY_INIT_MODULE(m, "spoa", spoa_methods, &spoa_module_definition);
- if (m == NULL) {
- Py_DECREF(ipv4_address);
- Py_DECREF(ipv6_address);
- Py_DECREF(module_ipaddress);
- PyErr_Print();
- return 0;
- }
-
- spoa_error = PyErr_NewException("spoa.error", NULL, NULL);
- /* PyModule_AddObject will steal the reference to spoa_error
- * in case of success only
- * We need to increment the counters to continue using it
- * but cleanup in case of failure
- */
- Py_INCREF(spoa_error);
- ret = PyModule_AddObject(m, "error", spoa_error);
- if (ret == -1) {
- Py_DECREF(m);
- Py_DECREF(spoa_error);
- PyErr_Print();
- return 0;
- }
-
-
- value = PyLong_FromLong(SPOE_SCOPE_PROC);
- if (value == NULL) {
- PyErr_Print();
- return 0;
- }
-
- ret = PyModule_AddObject(m, "scope_proc", value);
- if (ret == -1) {
- Py_DECREF(m);
- Py_DECREF(value);
- PyErr_Print();
- return 0;
- }
-
- value = PyLong_FromLong(SPOE_SCOPE_SESS);
- if (value == NULL) {
- Py_DECREF(m);
- PyErr_Print();
- return 0;
- }
-
- ret = PyModule_AddObject(m, "scope_sess", value);
- if (ret == -1) {
- Py_DECREF(m);
- Py_DECREF(value);
- PyErr_Print();
- return 0;
- }
-
- value = PyLong_FromLong(SPOE_SCOPE_TXN);
- if (value == NULL) {
- Py_DECREF(m);
- PyErr_Print();
- return 0;
- }
-
- ret = PyModule_AddObject(m, "scope_txn", value);
- if (ret == -1) {
- Py_DECREF(m);
- Py_DECREF(value);
- PyErr_Print();
- return 0;
- }
-
- value = PyLong_FromLong(SPOE_SCOPE_REQ);
- if (value == NULL) {
- Py_DECREF(m);
- PyErr_Print();
- return 0;
- }
-
- ret = PyModule_AddObject(m, "scope_req", value);
- if (ret == -1) {
- Py_DECREF(m);
- Py_DECREF(value);
- PyErr_Print();
- return 0;
- }
-
- value = PyLong_FromLong(SPOE_SCOPE_RES);
- if (value == NULL) {
- Py_DECREF(m);
- PyErr_Print();
- return 0;
- }
-
- ret = PyModule_AddObject(m, "scope_res", value);
- if (ret == -1) {
- Py_DECREF(m);
- Py_DECREF(value);
- PyErr_Print();
- return 0;
- }
-
- empty_tuple = PyTuple_New(0);
- if (empty_tuple == NULL) {
- PyErr_Print();
- return 0;
- }
-
-#if IS_PYTHON_3K
- module_spoa = m;
-#endif /* IS_PYTHON_3K */
- worker = w;
- return 1;
-}
-
-static int ps_python_load_file(struct worker *w, const char *file)
-{
- FILE *fp;
- int ret;
-
- fp = fopen(file, "r");
- if (fp == NULL) {
- LOG("python: Cannot read file \"%s\": %s", file, strerror(errno));
- return 0;
- }
-
- ret = PyRun_SimpleFile(fp, file);
- fclose(fp);
- if (ret != 0) {
- PyErr_Print();
- return 0;
- }
-
- return 1;
-}
-
-static int ps_python_exec_message(struct worker *w, void *ref, int nargs, struct spoe_kv *args)
-{
- int i;
- PyObject *python_ref = ref;
- PyObject *fkw;
- PyObject *kw_args;
- PyObject *result;
- PyObject *ent;
- PyObject *key;
- PyObject *value;
- PyObject *func;
- int ret;
- char ipbuf[64];
- const char *p;
- PyObject *ip_dict;
- PyObject *ip_name;
- PyObject *ip_value;
-
- /* Dict containing arguments */
-
- kw_args = PyList_New(0);
- if (kw_args == NULL) {
- PyErr_Print();
- return 0;
- }
-
- for (i = 0; i < nargs; i++) {
-
- /* New dict containing one argument */
-
- ent = PyDict_New();
- if (ent == NULL) {
- Py_DECREF(kw_args);
- PyErr_Print();
- return 0;
- }
-
- /* Create the name entry */
-
- key = PY_STRING_FROM_STRING("name");
- if (key == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- PyErr_Print();
- return 0;
- }
-
- value = PY_STRING_FROM_STRING_AND_SIZE(args[i].name.str, args[i].name.len);
- if (value == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- PyErr_Print();
- return 0;
- }
-
- ret = PyDict_SetItem(ent, key, value);
- Py_DECREF(key);
- Py_DECREF(value);
- if (ret == -1) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- PyErr_Print();
- return 0;
- }
-
- /* Create the value entry */
-
- key = PY_STRING_FROM_STRING("value");
- if (key == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- PyErr_Print();
- return 0;
- }
-
- switch (args[i].value.type) {
- case SPOE_DATA_T_NULL:
- Py_INCREF(Py_None);
- value = Py_None;
- break;
- case SPOE_DATA_T_BOOL:
- value = PyBool_FromLong(args[i].value.u.boolean);
- break;
- case SPOE_DATA_T_INT32:
- value = PyLong_FromLong(args[i].value.u.sint32);
- break;
- case SPOE_DATA_T_UINT32:
- value = PyLong_FromLong(args[i].value.u.uint32);
- break;
- case SPOE_DATA_T_INT64:
- value = PyLong_FromLong(args[i].value.u.sint64);
- break;
- case SPOE_DATA_T_UINT64:
- value = PyLong_FromUnsignedLong(args[i].value.u.uint64);
- break;
- case SPOE_DATA_T_IPV4:
- case SPOE_DATA_T_IPV6:
- if (args[i].value.type == SPOE_DATA_T_IPV4)
- p = inet_ntop(AF_INET, &args[i].value.u.ipv4, ipbuf, 64);
- else
- p = inet_ntop(AF_INET6, &args[i].value.u.ipv6, ipbuf, 64);
- if (!p)
- strcpy(ipbuf, "0.0.0.0");
-
- func = PyObject_GetAttrString(module_ipaddress, "ip_address");
- if (func == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- PyErr_Print();
- return 0;
- }
- ip_dict = PyDict_New();
- if (ip_dict == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- Py_DECREF(func);
- PyErr_Print();
- return 0;
- }
- ip_name = PY_STRING_FROM_STRING("address");
- if (ip_name == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- Py_DECREF(func);
- Py_DECREF(ip_dict);
- PyErr_Print();
- return 0;
- }
- ip_value = PyUnicode_FromString(ipbuf);
- if (ip_value == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- Py_DECREF(func);
- Py_DECREF(ip_dict);
- Py_DECREF(ip_name);
- PyErr_Print();
- return 0;
- }
- ret = PyDict_SetItem(ip_dict, ip_name, ip_value);
- Py_DECREF(ip_name);
- Py_DECREF(ip_value);
- if (ret == -1) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- Py_DECREF(func);
- Py_DECREF(ip_dict);
- PyErr_Print();
- return 0;
- }
- value = PyObject_Call(func, empty_tuple, ip_dict);
- Py_DECREF(func);
- Py_DECREF(ip_dict);
- break;
-
- case SPOE_DATA_T_STR:
- value = PY_STRING_FROM_STRING_AND_SIZE(args[i].value.u.buffer.str, args[i].value.u.buffer.len);
- break;
- case SPOE_DATA_T_BIN:
- value = PY_BYTES_FROM_STRING_AND_SIZE(args[i].value.u.buffer.str, args[i].value.u.buffer.len);
- break;
- default:
- Py_INCREF(Py_None);
- value = Py_None;
- break;
- }
- if (value == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- Py_DECREF(key);
- PyErr_Print();
- return 0;
- }
-
- ret = PyDict_SetItem(ent, key, value);
- Py_DECREF(key);
- Py_DECREF(value);
- if (ret == -1) {
- Py_DECREF(kw_args);
- Py_DECREF(ent);
- PyErr_Print();
- return 0;
- }
-
- /* Add dict to the list */
-
- ret = PyList_Append(kw_args, ent);
- Py_DECREF(ent);
- if (ret == -1) {
- Py_DECREF(kw_args);
- PyErr_Print();
- return 0;
- }
- }
-
- /* Dictionary { args = <list-of-args> } for the function */
-
- fkw = PyDict_New();
- if (fkw == NULL) {
- Py_DECREF(kw_args);
- PyErr_Print();
- return 0;
- }
-
- key = PY_STRING_FROM_STRING("args");
- if (key == NULL) {
- Py_DECREF(kw_args);
- Py_DECREF(fkw);
- PyErr_Print();
- return 0;
- }
-
- ret = PyDict_SetItem(fkw, key, kw_args);
- Py_DECREF(kw_args);
- Py_DECREF(key);
- if (ret == -1) {
- Py_DECREF(fkw);
- PyErr_Print();
- return 0;
- }
-
- result = PyObject_Call(python_ref, empty_tuple, fkw);
- Py_DECREF(fkw);
- if (result == NULL) {
- PyErr_Print();
- return 0;
- }
- Py_DECREF(result);
-
- return 1;
-}
-
-__attribute__((constructor))
-static void __ps_python_init(void)
-{
- ps_register(&ps_python_bindings);
-}
+++ /dev/null
-/* ps_python.h: SPOA Python processing includes
- *
- * Copyright (C) 2020 Gilchrist Dadaglo <gilchrist@dadaglo.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- *
- * This program is provided in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- */
-
-#ifndef __PS_PYTHON_H__
-#define __PS_PYTHON_H__
-
-#include <Python.h>
-
-#if PY_MAJOR_VERSION >= 3
- #define IS_PYTHON_3K 1
- #define IS_PYTHON_27 0
-#elif PY_MAJOR_VERSION == 2 && PY_MINOR_VERSION == 7
- #define IS_PYTHON_3K 0
- #define IS_PYTHON_27 1
-#else
- #error "Unsupported Python Version - Please use Python 3"
-#endif /* PY_MAJOR_VERSION */
-
-#if IS_PYTHON_3K
- #define PY_INIT_MODULE(instance, name, methods, moduledef) \
- (instance = PyModule_Create(moduledef))
- #define PY_STRING_FROM_STRING PyUnicode_FromString
- #define PY_STRING_FROM_STRING_AND_SIZE PyUnicode_FromStringAndSize
- #define PY_BYTES_FROM_STRING_AND_SIZE PyBytes_FromStringAndSize
- #define PY_STRING_GET_SIZE PyBytes_Size
- #define PY_STRING_AS_STRING PyBytes_AsString
-#elif IS_PYTHON_27
- #define PY_INIT_MODULE(instance, name, methods, moduledef) \
- (instance = Py_InitModule(name, methods))
- #define PY_STRING_FROM_STRING PyString_FromString
- #define PY_STRING_FROM_STRING_AND_SIZE PyString_FromStringAndSize
- #define PY_BYTES_FROM_STRING_AND_SIZE PyString_FromStringAndSize
- #define PY_STRING_GET_SIZE PyString_GET_SIZE
- #define PY_STRING_AS_STRING PyString_AS_STRING
-#endif /* IS_PYTHON_3K */
-
-#endif /* __PS_PYTHON_H__ */
-
-/* EOF */
+++ /dev/null
-from pprint import pprint
-import spoa
-import ipaddress
-import random
-
-def check_client_ip(args):
- pprint(args)
- spoa.set_var_null("null", spoa.scope_txn)
- spoa.set_var_boolean("boolean", spoa.scope_txn, True)
- spoa.set_var_int32("int32", spoa.scope_txn, 1234)
- spoa.set_var_uint32("uint32", spoa.scope_txn, 1234)
- spoa.set_var_int64("int64", spoa.scope_txn, 1234)
- spoa.set_var_uint64("uint64", spoa.scope_txn, 1234)
- spoa.set_var_ipv4("ipv4", spoa.scope_txn, ipaddress.IPv4Address(u"127.0.0.1"))
- spoa.set_var_ipv6("ipv6", spoa.scope_txn, ipaddress.IPv6Address(u"1::f"))
- spoa.set_var_str("str", spoa.scope_txn, "1::f")
- spoa.set_var_bin("bin", spoa.scope_txn, "1:\x01:\x02f\x00\x00")
- spoa.set_var_int32("ip_score", spoa.scope_sess, random.randint(1,100))
- return
-
-
-spoa.register_message("check-client-ip", check_client_ip)
+++ /dev/null
-global
- debug
-
-defaults
- mode http
- option httplog
- option dontlognull
- timeout connect 5000
- timeout client 5000
- timeout server 5000
-
-listen test
- mode http
- bind :10001
- filter spoe engine spoa-server config spoa-server.spoe.conf
- http-request set-var(req.a) var(txn.iprep.null),debug
- http-request set-var(req.a) var(txn.iprep.boolean),debug
- http-request set-var(req.a) var(txn.iprep.int32),debug
- http-request set-var(req.a) var(txn.iprep.uint32),debug
- http-request set-var(req.a) var(txn.iprep.int64),debug
- http-request set-var(req.a) var(txn.iprep.uint64),debug
- http-request set-var(req.a) var(txn.iprep.ipv4),debug
- http-request set-var(req.a) var(txn.iprep.ipv6),debug
- http-request set-var(req.a) var(txn.iprep.str),debug
- http-request set-var(req.a) var(txn.iprep.bin),debug
- http-request redirect location /%[var(sess.iprep.ip_score)]
-
-backend spoe-server
- mode tcp
- balance roundrobin
- timeout connect 5s
- timeout server 3m
- server spoe-server 127.0.0.1:12345
+++ /dev/null
-[spoa-server]
-
-spoe-agent spoa-server
- messages check-client-ip
- option var-prefix iprep
- timeout hello 100ms
- timeout idle 30s
- timeout processing 15ms
- use-backend spoe-server
-
-spoe-message check-client-ip
- args always_true int(1234) src ipv6(::55) req.fhdr(host)
- event on-frontend-http-request
+++ /dev/null
-/*
- * A Random IP reputation service acting as a Stream Processing Offload Agent
- *
- * This is a very simple service that implement a "random" ip reputation
- * service. It will return random scores for all checked IP addresses. It only
- * shows you how to implement a ip reputation service or such kind of services
- * using the SPOE.
- *
- * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
- * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- *
- */
-#include <limits.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <stdbool.h>
-#include <unistd.h>
-#include <signal.h>
-#include <pthread.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#include <arpa/inet.h>
-
-#include "spoa.h"
-
-#define DEFAULT_PORT 12345
-#define NUM_WORKERS 5
-
-#define SLEN(str) (sizeof(str)-1)
-
-/* Frame Types sent by HAProxy and by agents */
-enum spoe_frame_type {
- /* Frames sent by HAProxy */
- SPOE_FRM_T_HAPROXY_HELLO = 1,
- SPOE_FRM_T_HAPROXY_DISCON,
- SPOE_FRM_T_HAPROXY_NOTIFY,
-
- /* Frames sent by the agents */
- SPOE_FRM_T_AGENT_HELLO = 101,
- SPOE_FRM_T_AGENT_DISCON,
- SPOE_FRM_T_AGENT_ACK
-};
-
-/* Errors triggered by SPOE applet */
-enum spoe_frame_error {
- SPOE_FRM_ERR_NONE = 0,
- SPOE_FRM_ERR_IO,
- SPOE_FRM_ERR_TOUT,
- SPOE_FRM_ERR_TOO_BIG,
- SPOE_FRM_ERR_INVALID,
- SPOE_FRM_ERR_NO_VSN,
- SPOE_FRM_ERR_NO_FRAME_SIZE,
- SPOE_FRM_ERR_NO_CAP,
- SPOE_FRM_ERR_BAD_VSN,
- SPOE_FRM_ERR_BAD_FRAME_SIZE,
- SPOE_FRM_ERR_UNKNOWN = 99,
- SPOE_FRM_ERRS,
-};
-
-/* All supported SPOE actions */
-enum spoe_action_type {
- SPOE_ACT_T_SET_VAR = 1,
- SPOE_ACT_T_UNSET_VAR,
- SPOE_ACT_TYPES,
-};
-
-
-/* Masks to get data type or flags value */
-#define SPOE_DATA_T_MASK 0x0F
-#define SPOE_DATA_FL_MASK 0xF0
-
-/* Flags to set Boolean values */
-#define SPOE_DATA_FL_FALSE 0x00
-#define SPOE_DATA_FL_TRUE 0x10
-static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = {
- [SPOE_FRM_ERR_NONE] = "normal",
- [SPOE_FRM_ERR_IO] = "I/O error",
- [SPOE_FRM_ERR_TOUT] = "a timeout occurred",
- [SPOE_FRM_ERR_TOO_BIG] = "frame is too big",
- [SPOE_FRM_ERR_INVALID] = "invalid frame received",
- [SPOE_FRM_ERR_NO_VSN] = "version value not found",
- [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found",
- [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found",
- [SPOE_FRM_ERR_BAD_VSN] = "unsupported version",
- [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small",
- [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred",
-};
-
-bool debug = false;
-pthread_key_t worker_id;
-static struct ps *ps_list = NULL;
-static struct ps_message *ps_messages = NULL;
-static int nfiles = 0;
-static char **files = NULL;
-
-static inline void add_file(const char *file)
-{
- nfiles++;
- files = realloc(files, sizeof(*files) * nfiles);
- if (files == NULL) {
- fprintf(stderr, "Out of memory error\n");
- exit(EXIT_FAILURE);
- }
- files[nfiles - 1] = strdup(file);
- if (files[nfiles - 1] == NULL) {
- fprintf(stderr, "Out of memory error\n");
- exit(EXIT_FAILURE);
- }
-}
-
-void ps_register(struct ps *ps)
-{
- ps->next = ps_list;
- ps_list = ps;
-}
-
-void ps_register_message(struct ps *ps, const char *name, void *ref)
-{
- struct ps_message *msg;
-
- /* Look for already registered name */
- for (msg = ps_messages; msg; msg = msg->next) {
- if (strcmp(name, msg->name) == 0) {
- LOG("Message \"%s\" already registered\n", name);
- exit(EXIT_FAILURE);
- }
- }
-
- msg = calloc(1, sizeof(*msg));
- if (msg == NULL) {
- LOG("Out of memory error\n");
- exit(EXIT_FAILURE);
- }
-
- msg->next = ps_messages;
- ps_messages = msg;
- msg->name = strdup(name);
- if (msg->name == NULL) {
- LOG("Out of memory error\n");
- exit(EXIT_FAILURE);
- }
- msg->ref = ref;
- msg->ps = ps;
-}
-
-static int
-do_read(int sock, void *buf, int read_len)
-{
- fd_set readfds;
- int n = 0, total = 0, bytesleft = read_len;
-
- FD_ZERO(&readfds);
- FD_SET(sock, &readfds);
-
- while (total < read_len) {
- if (select(FD_SETSIZE, &readfds, NULL, NULL, NULL) == -1)
- return -1;
- if (!FD_ISSET(sock, &readfds))
- return -1;
-
- n = read(sock, buf + total, bytesleft);
- if (n <= 0)
- break;
-
- total += n;
- bytesleft -= n;
- }
-
- return (n == -1) ? -1 : total;
-}
-
-static int
-do_write(int sock, void *buf, int write_len)
-{
- fd_set writefds;
- int n = 0, total = 0, bytesleft = write_len;
-
- FD_ZERO(&writefds);
- FD_SET(sock, &writefds);
-
- while (total < write_len) {
- if (select(FD_SETSIZE, NULL, &writefds, NULL, NULL) == -1)
- return -1;
- if (!FD_ISSET(sock, &writefds))
- return -1;
-
- n = write(sock, buf + total, bytesleft);
- if (n <= 0)
- break;
-
- total += n;
- bytesleft -= n;
- }
-
- return (n == -1) ? -1 : total;
-}
-
-/* Receive a frame sent by HAProxy. It returns -1 if an error occurred,
- * otherwise the number of read bytes.*/
-static int
-read_frame(int sock, struct worker *w)
-{
- uint32_t netint;
- unsigned int framesz;
-
- /* Read the frame size, on 4 bytes */
- if (do_read(sock, &netint, sizeof(netint)) != 4) {
- w->status_code = SPOE_FRM_ERR_IO;
- return -1;
- }
-
- /* Check it against the max size */
- framesz = ntohl(netint);
- if (framesz > w->size) {
- w->status_code = SPOE_FRM_ERR_TOO_BIG;
- return -1;
- }
-
- /* Read the frame */
- if (do_read(sock, w->buf, framesz) != framesz) {
- w->status_code = SPOE_FRM_ERR_IO;
- return -1;
- }
-
- w->len = framesz;
- return framesz;
-}
-
-/* Send a frame to HAProxy. It returns -1 if an error occurred, otherwise the
- * number of written bytes. */
-static int
-write_frame(int sock, struct worker *w)
-{
- uint32_t netint;
-
- /* Write the frame size, on 4 bytes */
- netint = htonl(w->len);
- if (do_write(sock, &netint, sizeof(netint)) != 4) {
- w->status_code = SPOE_FRM_ERR_IO;
- return -1;
- }
-
- /* Write the frame */
- if (do_write(sock, w->buf, w->len) != w->len) {
- w->status_code = SPOE_FRM_ERR_IO;
- return -1;
- }
- return w->len;
-}
-
-/* Encode a variable-length integer. This function never fails and returns the
- * number of written bytes. */
-static int
-encode_spoe_varint(uint64_t i, char *buf)
-{
- int idx;
-
- if (i < 240) {
- buf[0] = (unsigned char)i;
- return 1;
- }
-
- buf[0] = (unsigned char)i | 240;
- i = (i - 240) >> 4;
- for (idx = 1; i >= 128; ++idx) {
- buf[idx] = (unsigned char)i | 128;
- i = (i - 128) >> 7;
- }
- buf[idx++] = (unsigned char)i;
- return idx;
-}
-
-/* Decode a varable-length integer. If the decoding fails, -1 is returned. This
- * happens when the buffer's end in reached. On success, the number of read
- * bytes is returned. */
-static int
-decode_spoe_varint(char *buf, char *end, uint64_t *i)
-{
- unsigned char *msg = (unsigned char *)buf;
- int idx = 0;
-
- if (msg > (unsigned char *)end)
- return -1;
-
- if (msg[0] < 240) {
- *i = msg[0];
- return 1;
- }
- *i = msg[0];
- do {
- ++idx;
- if (msg+idx > (unsigned char *)end)
- return -1;
- *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1));
- } while (msg[idx] >= 128);
- return (idx + 1);
-}
-
-/* Encode a string. The string will be prefix by its length, encoded as a
- * variable-length integer. This function never fails and returns the number of
- * written bytes. */
-static int
-encode_spoe_string(const char *str, size_t len, char *dst)
-{
- int idx = 0;
-
- if (!len) {
- dst[0] = 0;
- return 1;
- }
-
- idx += encode_spoe_varint(len, dst);
- memcpy(dst+idx, str, len);
- return (idx + len);
-}
-
-/* Decode a string. Its length is decoded first as a variable-length integer. If
- * it succeeds, and if the string length is valid, the begin of the string is
- * saved in <*str>, its length is saved in <*len> and the total numbre of bytes
- * read is returned. If an error occurred, -1 is returned and <*str> remains
- * NULL. */
-static int
-decode_spoe_string(char *buf, char *end, char **str, uint64_t *len)
-{
- int r, idx = 0;
-
- *str = NULL;
- *len = 0;
-
- if ((r = decode_spoe_varint(buf, end, len)) == -1)
- goto error;
- idx += r;
- if (buf + idx + *len > end)
- goto error;
-
- *str = buf+idx;
- return (idx + *len);
-
-error:
- return -1;
-}
-
-/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number
- * of bytes read is returned. A types data is composed of a type (1 byte) and
- * corresponding data:
- * - boolean: non additional data (0 bytes)
- * - integers: a variable-length integer (see decode_spoe_varint)
- * - ipv4: 4 bytes
- * - ipv6: 16 bytes
- * - binary and string: a buffer prefixed by its size, a variable-length
- * integer (see decode_spoe_string) */
-static int
-skip_spoe_data(char *frame, char *end)
-{
- uint64_t sz = 0;
- int r, idx = 0;
-
- if (frame > end)
- return -1;
-
- switch (frame[idx++] & SPOE_DATA_T_MASK) {
- case SPOE_DATA_T_BOOL:
- idx++;
- break;
- case SPOE_DATA_T_INT32:
- case SPOE_DATA_T_INT64:
- case SPOE_DATA_T_UINT32:
- case SPOE_DATA_T_UINT64:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- idx += r;
- break;
- case SPOE_DATA_T_IPV4:
- idx += 4;
- break;
- case SPOE_DATA_T_IPV6:
- idx += 16;
- break;
- case SPOE_DATA_T_STR:
- case SPOE_DATA_T_BIN:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- idx += r + sz;
- break;
- }
-
- if (frame+idx > end)
- return -1;
- return idx;
-}
-
-/* Decode a typed data. If an error occurred, -1 is returned, otherwise the
- * number of read bytes is returned. See skip_spoe_data for details. */
-static int
-decode_spoe_data(char *frame, char *end, struct spoe_data *data)
-{
- uint64_t sz = 0;
- int type, r, idx = 0;
-
- if (frame > end)
- return -1;
-
- type = frame[idx++];
- data->type = (type & SPOE_DATA_T_MASK);
- switch (data->type) {
- case SPOE_DATA_T_BOOL:
- data->u.boolean = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
- break;
- case SPOE_DATA_T_INT32:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- data->u.sint32 = sz;
- idx += r;
- break;
- case SPOE_DATA_T_INT64:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- data->u.uint32 = sz;
- idx += r;
- break;
- case SPOE_DATA_T_UINT32:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- data->u.sint64 = sz;
- idx += r;
- break;
- case SPOE_DATA_T_UINT64:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- data->u.uint64 = sz;
- idx += r;
- break;
- case SPOE_DATA_T_IPV4:
- if (frame+idx+4 > end)
- return -1;
- memcpy(&data->u.ipv4, frame+idx, 4);
- idx += 4;
- break;
- case SPOE_DATA_T_IPV6:
- if (frame+idx+16 > end)
- return -1;
- memcpy(&data->u.ipv6, frame+idx, 16);
- idx += 16;
- break;
- case SPOE_DATA_T_STR:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- idx += r;
- if (frame+idx+sz > end)
- return -1;
- data->u.buffer.str = frame+idx;
- data->u.buffer.len = sz;
- idx += sz;
- break;
- case SPOE_DATA_T_BIN:
- if ((r = decode_spoe_varint(frame+idx, end, &sz)) == -1)
- return -1;
- idx += r;
- if (frame+idx+sz > end)
- return -1;
- data->u.buffer.str = frame+idx;
- data->u.buffer.len = sz;
- idx += sz;
- break;
- default:
- break;
- }
-
- if (frame+idx > end)
- return -1;
- return idx;
-}
-
-
-/* Check the protocol version. It returns -1 if an error occurred, the number of
- * read bytes otherwise. */
-static int
-check_proto_version(struct worker *w, int idx)
-{
- char *str;
- uint64_t sz;
-
- /* Get the list of all supported versions by HAProxy */
- if ((w->buf[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- return -1;
- }
- idx += decode_spoe_string(w->buf+idx, w->buf+w->len, &str, &sz);
- if (str == NULL) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- return -1;
- }
-
- /* TODO: Find the right version in supported ones */
-
- return idx;
-}
-
-/* Check max frame size value. It returns -1 if an error occurred, the number of
- * read bytes otherwise. */
-static int
-check_max_frame_size(struct worker *w, int idx)
-{
- uint64_t sz;
- int type, i;
-
- /* Get the max-frame-size value of HAProxy */
- type = w->buf[idx++];
- if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 &&
- (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 &&
- (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 &&
- (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- return -1;
- }
- if ((i = decode_spoe_varint(w->buf+idx, w->buf+w->len, &sz)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- return -1;
- }
- idx += i;
-
- /* Keep the lower value */
- if (sz < w->size)
- w->size = sz;
-
- return idx;
-}
-
-/* Check healthcheck value. It returns -1 if an error occurred, the number of
- * read bytes otherwise. */
-static int
-check_healthcheck(struct worker *w, int idx)
-{
- int type;
-
- /* Get the "healthcheck" value of HAProxy */
- type = w->buf[idx++];
- if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_BOOL) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- return -1;
- }
- w->healthcheck = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE);
- return idx;
-}
-
-
-/* Decode a HELLO frame received from HAProxy. It returns -1 if an error
- * occurred, 0 if the frame must be skipped, otherwise the number of read
- * bytes. */
-static int
-handle_hahello(struct worker *w)
-{
- char *end = w->buf+w->len;
- int i, idx = 0;
-
- /* Check frame type */
- if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_HELLO)
- goto skip;
-
- /* Skip flags */
- idx += 4;
-
- /* stream-id and frame-id must be cleared */
- if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += 2;
-
- /* Loop on K/V items */
- while (idx < w->len) {
- char *str;
- uint64_t sz;
-
- /* Decode the item name */
- idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
- if (str == NULL) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
-
- /* Check "supported-versions" K/V item */
- if (!memcmp(str, "supported-versions", sz)) {
- if ((i = check_proto_version(w, idx)) == -1)
- goto error;
- idx = i;
- }
- /* Check "max-frame-size" K/V item "*/
- else if (!memcmp(str, "max-frame-size", sz)) {
- if ((i = check_max_frame_size(w, idx)) == -1)
- goto error;
- idx = i;
- }
- /* Check "healthcheck" K/V item "*/
- else if (!memcmp(str, "healthcheck", sz)) {
- if ((i = check_healthcheck(w, idx)) == -1)
- goto error;
- idx = i;
- }
- /* Skip "capabilities" K/V item for now */
- else {
- /* Silently ignore unknown item */
- if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += i;
- }
- }
-
- return idx;
-skip:
- return 0;
-error:
- return -1;
-}
-
-/* Decode a DISCONNECT frame received from HAProxy. It returns -1 if an error
- * occurred, 0 if the frame must be skipped, otherwise the number of read
- * bytes. */
-static int
-handle_hadiscon(struct worker *w)
-{
- char *end = w->buf+w->len;
- int i, idx = 0;
-
- /* Check frame type */
- if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_DISCON)
- goto skip;
-
- /* Skip flags */
- idx += 4;
-
- /* stream-id and frame-id must be cleared */
- if (w->buf[idx] != 0 || w->buf[idx+1] != 0) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += 2;
-
- /* Loop on K/V items */
- while (idx < w->len) {
- char *str;
- uint64_t sz;
-
- /* Decode item key */
- idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
- if (str == NULL) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- /* Silently ignore unknown item */
- if ((i = skip_spoe_data(w->buf+idx, end)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += i;
- }
-
- w->status_code = SPOE_FRM_ERR_NONE;
- return idx;
-skip:
- return 0;
-error:
- return -1;
-}
-
-/* Encode a ACK frame to send it to HAProxy. It returns -1 if an error occurred,
- * the number of written bytes otherwise. */
-static void prepare_agentack(struct worker *w)
-{
- unsigned int flags = 0;
-
- w->ack_len = 0;
-
- /* Frame type */
- w->ack[w->ack_len++] = SPOE_FRM_T_AGENT_ACK;
-
- /* Set flags */
- flags |= htonl(SPOE_FRM_FL_FIN);
- memcpy(w->ack + w->ack_len, &flags, 4);
- w->ack_len += 4;
-
- /* Set stream-id and frame-id for ACK frames */
- w->ack_len += encode_spoe_varint(w->stream_id, w->ack + w->ack_len);
- w->ack_len += encode_spoe_varint(w->frame_id, w->ack + w->ack_len);
-}
-
-static inline
-int set_var_name(struct worker *w, const char *name, int name_len, unsigned char scope)
-{
- w->ack[w->ack_len++] = SPOE_ACT_T_SET_VAR; /* Action type */
- w->ack[w->ack_len++] = 3; /* Number of args */
- w->ack[w->ack_len++] = scope; /* Arg 1: the scope */
- w->ack_len += encode_spoe_string(name, name_len, w->ack+w->ack_len); /* Arg 2: variable name */
- return 1;
-}
-
-int set_var_null(struct worker *w,
- const char *name, int name_len,
- unsigned char scope)
-{
- if (!set_var_name(w, name, name_len, scope))
- return 0;
- w->ack[w->ack_len++] = SPOE_DATA_T_NULL;
- return 1;
-}
-
-int set_var_bool(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, bool value)
-{
- if (!set_var_name(w, name, name_len, scope))
- return 0;
- w->ack[w->ack_len++] = SPOE_DATA_T_BOOL | (!!value << 4);
- return 1;
-}
-
-static inline
-int set_var_int(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, int type, uint64_t value)
-{
- if (!set_var_name(w, name, name_len, scope))
- return 0;
- w->ack[w->ack_len++] = SPOE_DATA_T_UINT32;
- w->ack_len += encode_spoe_varint(value, w->ack+w->ack_len); /* Arg 3: variable value */
- return 1;
-}
-
-int set_var_uint32(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, uint32_t value)
-{
- return set_var_int(w, name, name_len, scope, SPOE_DATA_T_UINT32, value);
-}
-
-int set_var_int32(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, int32_t value)
-{
- return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
-}
-
-int set_var_uint64(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, uint64_t value)
-{
- return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
-}
-
-int set_var_int64(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, int64_t value)
-{
- return set_var_int(w, name, name_len, scope, SPOE_DATA_T_INT32, value);
-}
-
-int set_var_ipv4(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- struct in_addr *ipv4)
-{
- if (!set_var_name(w, name, name_len, scope))
- return 0;
- w->ack[w->ack_len++] = SPOE_DATA_T_IPV4;
- memcpy(w->ack+w->ack_len, ipv4, 4);
- w->ack_len += 4;
- return 1;
-}
-
-int set_var_ipv6(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- struct in6_addr *ipv6)
-{
- if (!set_var_name(w, name, name_len, scope))
- return 0;
- w->ack[w->ack_len++] = SPOE_DATA_T_IPV6;
- memcpy(w->ack+w->ack_len, ipv6, 16);
- w->ack_len += 16;
- return 1;
-}
-
-static inline
-int set_var_buf(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, int type,
- const char *str, int str_len)
-{
- if (!set_var_name(w, name, name_len, scope))
- return 0;
- w->ack[w->ack_len++] = type;
- w->ack_len += encode_spoe_string(str, str_len, w->ack+w->ack_len);
- return 1;
-}
-
-int set_var_string(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- const char *str, int strlen)
-{
- return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_STR, str, strlen);
-}
-
-int set_var_bin(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- const char *str, int strlen)
-{
- return set_var_buf(w, name, name_len, scope, SPOE_DATA_T_BIN, str, strlen);
-}
-
-/* This function is a little bit ugly,
- * TODO: improve the response without copying the buffer
- */
-static int commit_agentack(struct worker *w)
-{
- memcpy(w->buf, w->ack, w->ack_len);
- w->len = w->ack_len;
- return 1;
-}
-
-/* Decode a NOTIFY frame received from HAProxy. It returns -1 if an error
- * occurred, 0 if the frame must be skipped, otherwise the number of read
- * bytes. */
-static int
-handle_hanotify(struct worker *w)
-{
- char *end = w->buf+w->len;
- uint64_t stream_id, frame_id;
- int nbargs, i, idx = 0;
- int index;
- struct spoe_kv args[256];
- uint64_t length;
- struct ps_message *msg;
-
- /* Check frame type */
- if (w->buf[idx++] != SPOE_FRM_T_HAPROXY_NOTIFY)
- goto skip;
-
- /* Skip flags */
- idx += 4;
-
- /* Read the stream-id */
- if ((i = decode_spoe_varint(w->buf+idx, end, &stream_id)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += i;
-
- /* Read the frame-id */
- if ((i = decode_spoe_varint(w->buf+idx, end, &frame_id)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += i;
-
- w->stream_id = (unsigned int)stream_id;
- w->frame_id = (unsigned int)frame_id;
-
- DEBUG("Notify frame received: stream-id=%u - frame-id=%u",
- w->stream_id, w->frame_id);
-
- /* Prepare ack, if the processing fails the ack will be cancelled */
- prepare_agentack(w);
-
- /* Loop on messages */
- while (idx < w->len) {
- char *str;
- uint64_t sz;
-
- /* Decode the message name */
- idx += decode_spoe_string(w->buf+idx, end, &str, &sz);
- if (str == NULL) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- DEBUG(" Message '%.*s' received", (int)sz, str);
-
- /* Decode all SPOE data */
- nbargs = (unsigned char)w->buf[idx++];
- for (index = 0; index < nbargs; index++) {
-
- /* Read the key name */
- if ((i = decode_spoe_string(w->buf+idx, end,
- &args[index].name.str,
- &length)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- if (length > INT_MAX) {
- w->status_code = SPOE_FRM_ERR_TOO_BIG;
- goto error;
- }
- args[index].name.len = length;
- idx += i;
-
- /* Read the value */
- memset(&args[index].value, 0, sizeof(args[index].value));
- if ((i = decode_spoe_data(w->buf+idx, end, &args[index].value)) == -1) {
- w->status_code = SPOE_FRM_ERR_INVALID;
- goto error;
- }
- idx += i;
- }
-
- /* Lookup for existing bindings. If no existing message
- * where found, does nothing.
- */
- for (msg = ps_messages; msg; msg = msg->next)
- if (sz == strlen(msg->name) && strncmp(str, msg->name, sz) == 0)
- break;
- if (msg == NULL || msg->ps->exec_message == NULL) {
- DEBUG(" Message '%.*s' have no bindings registered", (int)sz, str);
- continue;
- }
-
- /* Process the message */
- msg->ps->exec_message(w, msg->ref, nbargs, args);
- }
-
- return idx;
-skip:
- return 0;
-error:
- return -1;
-}
-
-/* Encode a HELLO frame to send it to HAProxy. It returns -1 if an error
- * occurred, the number of written bytes otherwise. */
-static int
-prepare_agenthello(struct worker *w)
-{
- int idx = 0;
- unsigned int flags = 0;
-
- /* Frame Type */
- w->buf[idx++] = SPOE_FRM_T_AGENT_HELLO;
-
- /* Set flags */
- flags |= htonl(SPOE_FRM_FL_FIN);
- memcpy(w->buf+idx, &flags, 4);
- idx += 4;
-
- /* No stream-id and frame-id for HELLO frames */
- w->buf[idx++] = 0;
- w->buf[idx++] = 0;
-
- /* "version" K/V item */
- idx += encode_spoe_string("version", 7, w->buf+idx);
- w->buf[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(SPOP_VERSION, SLEN(SPOP_VERSION), w->buf+idx);
-
- /* "max-frame-size" K/V item */
- idx += encode_spoe_string("max-frame-size", 14, w->buf+idx);
- w->buf[idx++] = SPOE_DATA_T_UINT32;
- idx += encode_spoe_varint(w->size, w->buf+idx);
-
- /* "capabilities" K/V item */
- idx += encode_spoe_string("capabilities", 12, w->buf+idx);
- w->buf[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(SPOA_CAPABILITIES, SLEN(SPOA_CAPABILITIES), w->buf+idx);
-
- w->len = idx;
- return idx;
-}
-
-/* Encode a DISCONNECT frame to send it to HAProxy. It returns -1 if an error
- * occurred, the number of written bytes otherwise. */
-static int
-prepare_agentdicon(struct worker *w)
-{
- const char *reason;
- int rlen, idx = 0;
- unsigned int flags = 0;
-
- if (w->status_code >= SPOE_FRM_ERRS)
- w->status_code = SPOE_FRM_ERR_UNKNOWN;
- reason = spoe_frm_err_reasons[w->status_code];
- rlen = strlen(reason);
-
- /* Frame type */
- w->buf[idx++] = SPOE_FRM_T_AGENT_DISCON;
-
- /* Set flags */
- flags |= htonl(SPOE_FRM_FL_FIN);
- memcpy(w->buf+idx, &flags, 4);
- idx += 4;
-
- /* No stream-id and frame-id for DISCONNECT frames */
- w->buf[idx++] = 0;
- w->buf[idx++] = 0;
-
- /* There are 2 mandatory items: "status-code" and "message" */
-
- /* "status-code" K/V item */
- idx += encode_spoe_string("status-code", 11, w->buf+idx);
- w->buf[idx++] = SPOE_DATA_T_UINT32;
- idx += encode_spoe_varint(w->status_code, w->buf+idx);
-
- /* "message" K/V item */
- idx += encode_spoe_string("message", 7, w->buf+idx);
- w->buf[idx++] = SPOE_DATA_T_STR;
- idx += encode_spoe_string(reason, rlen, w->buf+idx);
-
- w->len = idx;
- return idx;
-}
-
-static int
-hello_handshake(int sock, struct worker *w)
-{
- if (read_frame(sock, w) < 0) {
- LOG("Failed to read Haproxy HELLO frame");
- goto error;
- }
- if (handle_hahello(w) < 0) {
- LOG("Failed to handle Haproxy HELLO frame");
- goto error;
- }
- if (prepare_agenthello(w) < 0) {
- LOG("Failed to prepare Agent HELLO frame");
- goto error;
- }
- if (write_frame(sock, w) < 0) {
- LOG("Failed to write Agent frame");
- goto error;
- }
- DEBUG("Hello handshake done: version=%s - max-frame-size=%u - healthcheck=%s",
- SPOP_VERSION, w->size, (w->healthcheck ? "true" : "false"));
- return 0;
-error:
- return -1;
-}
-
-static int
-notify_ack_roundtip(int sock, struct worker *w)
-{
- if (read_frame(sock, w) < 0) {
- LOG("Failed to read Haproxy NOTIFY frame");
- goto error_or_quit;
- }
- if (handle_hadiscon(w) != 0) {
- if (w->status_code != SPOE_FRM_ERR_NONE)
- LOG("Failed to handle Haproxy DISCONNECT frame");
- DEBUG("Disconnect frame received: reason=%s",
- spoe_frm_err_reasons[w->status_code]);
- goto error_or_quit;
- }
- if (handle_hanotify(w) < 0) {
- LOG("Failed to handle Haproxy NOTIFY frame");
- goto error_or_quit;
- }
- if (commit_agentack(w) < 0) {
- LOG("Failed to prepare Agent ACK frame");
- goto error_or_quit;
- }
- if (write_frame(sock, w) < 0) {
- LOG("Failed to write Agent ACK frame");
- goto error_or_quit;
- }
- DEBUG("Ack frame sent: stream-id=%u - frame-id=%u",
- w->stream_id, w->frame_id);
- return 0;
-error_or_quit:
- return -1;
-}
-
-static void *
-spoa_worker(void *data)
-{
- struct worker w;
- struct sockaddr_in client;
- int *info = (int *)data;
- int csock, lsock = info[0];
- struct ps *ps;
- int i;
- int len;
-
- signal(SIGPIPE, SIG_IGN);
- pthread_setspecific(worker_id, &info[1]);
-
- /* Init registered processors */
- for (ps = ps_list; ps != NULL; ps = ps->next)
- ps->init_worker(&w);
-
- /* Load files */
- for (i = 0; i < nfiles; i++) {
- len = strlen(files[i]);
- for (ps = ps_list; ps != NULL; ps = ps->next)
- if (strcmp(files[i] + len - strlen(ps->ext), ps->ext) == 0)
- break;
- if (ps == NULL) {
- LOG("Can't load file \"%s\"\n", files[i]);
- goto out;
- }
- if (!ps->load_file(&w, files[i]))
- goto out;
- }
-
- while (1) {
- socklen_t sz = sizeof(client);
-
- if ((csock = accept(lsock, (struct sockaddr *)&client, &sz)) < 0) {
- LOG("Failed to accept client connection: %m");
- goto out;
- }
- memset(&w, 0, sizeof(w));
- w.id = info[1];
- w.size = MAX_FRAME_SIZE;
-
- DEBUG("New connection from HAProxy accepted");
-
- if (hello_handshake(csock, &w) < 0)
- goto disconnect;
- if (w.healthcheck == true)
- goto close;
- while (1) {
- if (notify_ack_roundtip(csock, &w) < 0)
- break;
- }
-
- disconnect:
- if (w.status_code == SPOE_FRM_ERR_IO) {
- LOG("Close the client socket because of I/O errors");
- goto close;
- }
- if (prepare_agentdicon(&w) < 0) {
- LOG("Failed to prepare Agent DISCONNECT frame");
- goto close;
- }
- if (write_frame(csock, &w) < 0) {
- LOG("Failed to write Agent DISCONNECT frame");
- goto close;
- }
- DEBUG("Disconnect frame sent: reason=%s",
- spoe_frm_err_reasons[w.status_code]);
-
- close:
- close(csock);
- }
-
-out:
- free(info);
-#if 0
- pthread_exit(NULL);
-#endif
- return NULL;
-}
-
-int process_create(pid_t *pid, void *(*ps)(void *), void *data)
-{
- if (debug) {
- ps(data);
- exit(EXIT_SUCCESS);
- }
- *pid = fork();
- if (*pid == -1)
- return -1;
- if (*pid > 0)
- return 0;
- ps(data);
- return 0;
-}
-
-static void
-usage(char *prog)
-{
- fprintf(stderr, "Usage: %s [-h] [-d] [-p <port>] [-n <num-workers>] -f <file>\n", prog);
- fprintf(stderr, " -h Print this message\n");
- fprintf(stderr, " -d Enable the debug mode\n");
- fprintf(stderr, " -p <port> Specify the port to listen on (default: 12345)\n");
- fprintf(stderr, " -n <num-workers> Specify the number of workers (default: 5)\n");
- fprintf(stderr, " -f <file> Specify the file whoch contains the processing code.\n");
- fprintf(stderr, " This argument can specified more than once.\n");
-}
-
-int
-main(int argc, char **argv)
-{
-#if 0
- pthread_t *ts = NULL;
-#endif
- pid_t *pids;
- struct sockaddr_in server;
- int i, sock, opt, nbworkers, port;
- int status;
-
- nbworkers = NUM_WORKERS;
- port = DEFAULT_PORT;
- while ((opt = getopt(argc, argv, "hdn:p:f:")) != -1) {
- switch (opt) {
- case 'h':
- usage(argv[0]);
- return EXIT_SUCCESS;
- case 'd':
- debug = true;
- break;
- case 'n':
- nbworkers = atoi(optarg);
- break;
- case 'p':
- port = atoi(optarg);
- break;
- case 'f':
- add_file(optarg);
- break;
- default:
- usage(argv[0]);
- return EXIT_FAILURE;
- }
- }
-
- if (nbworkers <= 0) {
- fprintf(stderr, "%s: Invalid number of workers '%d'\n",
- argv[0], nbworkers);
- goto error;
- }
- if (port <= 0) {
- fprintf(stderr, "%s: Invalid port '%d'\n", argv[0], port);
- goto error;
- }
-
- if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- fprintf(stderr, "Failed creating socket: %m\n");
- goto error;
- }
-
- setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (int []){1}, sizeof(int));
- setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int []){1}, sizeof(int));
-
- memset(&server, 0, sizeof(server));
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = INADDR_ANY;
- server.sin_port = htons(port);
-
- if (bind(sock, (struct sockaddr *)&server, sizeof(server)) < 0) {
- fprintf(stderr, "Failed to bind the socket: %m\n");
- goto error;
- }
-
- if (listen(sock , 10) < 0) {
- fprintf(stderr, "Failed to listen on the socket: %m\n");
- goto error;
- }
- fprintf(stderr, "SPOA is listening on port %d\n", port);
-
- pthread_key_create(&worker_id, NULL);
-
- /* Initialise the server in thread mode. This code is commented
- * out and not deleted, because later I expect to work with
- * process ansd threads. This first version just support processes.
- */
-#if 0
- ts = calloc(nbworkers, sizeof(*ts));
- for (i = 0; i < nbworkers; i++) {
- int *info = calloc(2, sizeof(*info));
-
- info[0] = sock;
- info[1] = i+1;
-
- if (pthread_create(&ts[i], NULL, spoa_worker, info) < 0) {
- fprintf(stderr, "Failed to create thread %d: %m\n", i+1);
- goto error;
- }
- fprintf(stderr, "SPOA worker %02d started\n", i+1);
- }
-
- for (i = 0; i < nbworkers; i++) {
- pthread_join(ts[i], NULL);
- fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
- }
- free(ts);
-#endif
-
- /* Start processes */
- pids = calloc(nbworkers, sizeof(*pids));
- if (!pids) {
- fprintf(stderr, "Out of memory error\n");
- goto error;
- }
- for (i = 0; i < nbworkers; i++) {
- int *info = calloc(2, sizeof(*info));
-
- info[0] = sock;
- info[1] = i+1;
-
- if (process_create(&pids[i], spoa_worker, info) == -1) {
- fprintf(stderr, "SPOA worker %02d started\n", i+1);
- goto error;
- }
- fprintf(stderr, "SPOA worker %02d started\n", i+1);
- }
- for (i = 0; i < nbworkers; i++) {
- waitpid(pids[0], &status, 0);
- fprintf(stderr, "SPOA worker %02d stopped\n", i+1);
- }
-
- close(sock);
- pthread_key_delete(worker_id);
- return EXIT_SUCCESS;
-
-error:
- return EXIT_FAILURE;
-}
+++ /dev/null
-/* Main SPOA server includes
- *
- * Copyright 2016 HAProxy Technologies, Christopher Faulet <cfaulet@haproxy.com>
- * Copyright 2018 OZON / Thierry Fournier <thierry.fournier@ozon.io>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version
- * 2 of the License, or (at your option) any later version.
- */
-#ifndef __SPOA_H__
-#define __SPOA_H__
-
-#include <pthread.h>
-#include <stdbool.h>
-#include <stdint.h>
-#include <netinet/in.h>
-#include <sys/time.h>
-
-#ifndef MAX_FRAME_SIZE
-#define MAX_FRAME_SIZE 16384
-#endif
-
-#define SPOP_VERSION "2.0"
-#define SPOA_CAPABILITIES ""
-
-/* Flags set on the SPOE frame */
-#define SPOE_FRM_FL_FIN 0x00000001
-
-/* All supported data types */
-enum spoe_data_type {
- SPOE_DATA_T_NULL = 0,
- SPOE_DATA_T_BOOL,
- SPOE_DATA_T_INT32,
- SPOE_DATA_T_UINT32,
- SPOE_DATA_T_INT64,
- SPOE_DATA_T_UINT64,
- SPOE_DATA_T_IPV4,
- SPOE_DATA_T_IPV6,
- SPOE_DATA_T_STR,
- SPOE_DATA_T_BIN,
- SPOE_DATA_TYPES
-};
-
-/* Scopes used for variables set by agents. It is a way to be agnotic to vars
- * scope. */
-enum spoe_vars_scope {
- SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */
- SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */
- SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */
- SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */
- SPOE_SCOPE_RES, /* <=> SCOPE_RES */
-};
-
-struct worker {
- unsigned int id;
- char buf[MAX_FRAME_SIZE];
- unsigned int len;
- unsigned int size;
- int status_code;
- unsigned int stream_id;
- unsigned int frame_id;
- bool healthcheck;
- char ack[MAX_FRAME_SIZE];
- unsigned int ack_len;
-};
-
-struct chunk {
- char *str; /* beginning of the string itself. Might not be 0-terminated */
- int len; /* current size of the string from first to last char */
-};
-
-union spoe_value {
- bool boolean; /* use for boolean */
- int32_t sint32; /* used for signed 32bits integers */
- uint32_t uint32; /* used for signed 32bits integers */
- int32_t sint64; /* used for signed 64bits integers */
- uint32_t uint64; /* used for signed 64bits integers */
- struct in_addr ipv4; /* used for ipv4 addresses */
- struct in6_addr ipv6; /* used for ipv6 addresses */
- struct chunk buffer; /* used for char strings or buffers */
-};
-
-/* Used to store sample constant */
-struct spoe_data {
- enum spoe_data_type type; /* SPOE_DATA_T_* */
- union spoe_value u; /* spoe data value */
-};
-
-struct spoe_kv {
- struct chunk name;
- struct spoe_data value;
-};
-
-struct ps {
- struct ps *next;
- char *ext;
- int (*init_worker)(struct worker *w);
- int (*exec_message)(struct worker *w, void *ref, int nargs, struct spoe_kv *args);
- int (*load_file)(struct worker *w, const char *file);
-};
-
-struct ps_message {
- struct ps_message *next;
- const char *name;
- struct ps *ps;
- void *ref;
-};
-
-extern bool debug;
-extern pthread_key_t worker_id;
-
-void ps_register(struct ps *ps);
-void ps_register_message(struct ps *ps, const char *name, void *ref);
-
-int set_var_null(struct worker *w,
- const char *name, int name_len,
- unsigned char scope);
-int set_var_bool(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, bool value);
-int set_var_uint32(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, uint32_t value);
-int set_var_int32(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, int32_t value);
-int set_var_uint64(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, uint64_t value);
-int set_var_int64(struct worker *w,
- const char *name, int name_len,
- unsigned char scope, int64_t value);
-int set_var_ipv4(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- struct in_addr *ipv4);
-int set_var_ipv6(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- struct in6_addr *ipv6);
-int set_var_string(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- const char *str, int strlen);
-int set_var_bin(struct worker *w,
- const char *name, int name_len,
- unsigned char scope,
- const char *str, int strlen);
-
-#define LOG(fmt, args...) \
- do { \
- struct timeval now; \
- int wid = *((int*)pthread_getspecific(worker_id)); \
- \
- gettimeofday(&now, NULL); \
- fprintf(stderr, "%ld.%06ld [%02d] " fmt "\n", \
- now.tv_sec, now.tv_usec, wid, ##args); \
- } while (0)
-
-#define DEBUG(x...) \
- do { \
- if (debug) \
- LOG(x); \
- } while (0)
-
-#endif /* __SPOA_H__ */