From: Marek Vavrusa Date: Mon, 28 Mar 2016 23:08:24 +0000 (-0700) Subject: daemon: support running in supervised mode (--fd=X) X-Git-Tag: v1.0.0~54^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d944e311dcf4578ddf021da8818b3f6a9f2e5d75;p=thirdparty%2Fknot-resolver.git daemon: support running in supervised mode (--fd=X) daemon can accept existing fds on command line, thus supporting process managers like circus or upstart. a tiny supervisor script is attached --- diff --git a/daemon/README.rst b/daemon/README.rst index d05ef8b5e..546e8a117 100644 --- a/daemon/README.rst +++ b/daemon/README.rst @@ -124,6 +124,22 @@ of running processes, and you can test the process for liveliness by connecting .. warning:: This is very basic way to orchestrate multi-core deployments and doesn't scale in multi-node clusters. Keep an eye on the prepared ``hive`` module that is going to automate everything from service discovery to deployment and consistent configuration. +Running supervised +================== + +Knot Resolver can run under a supervisor to allow for graceful restarts, watchdog process and socket activation. This way the supervisor binds to sockets and lends them to resolver daemon. Thus if the resolver terminates or is killed, the sockets are still active and no queries are dropped. + +The watchdog process must notify kresd about active file descriptors, and kresd will automatically determine the socket type and bound address, thus it will appear as any other address. There's a tiny supervisor script for convenience, but you should have a look at [real process managers](http://blog.crocodoc.com/post/48703468992/process-managers-the-good-the-bad-and-the-ugly). + +.. code-block:: bash + + $ python scripts/supervisor.py ./daemon/kresd 127.0.0.1@53 + $ [system] interactive mode + > quit() + > [2016-03-28 16:06:36.795879] process finished, pid = 99342, status = 0, uptime = 0:00:01.720612 + [system] interactive mode + > + Configuration ============= diff --git a/daemon/main.c b/daemon/main.c index b9c395cac..36e160638 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -148,6 +148,7 @@ static void help(int argc, char *argv[]) printf("Usage: %s [parameters] [rundir]\n", argv[0]); printf("\nParameters:\n" " -a, --addr=[addr] Server address (default: localhost#53).\n" + " -S, --fd=[fd] Listen on given fd (handed out by supervisor).\n" " -c, --config=[path] Config file path (relative to [rundir]) (default: config).\n" " -k, --keyfile=[path] File containing trust anchors (DS or DNSKEY).\n" " -f, --forks=N Start N forks sharing the configuration.\n" @@ -225,7 +226,9 @@ int main(int argc, char **argv) { int forks = 1; array_t(char*) addr_set; + array_t(int) fd_set; array_init(addr_set); + array_init(fd_set); char *keyfile = NULL; const char *config = NULL; char *keyfile_buf = NULL; @@ -234,6 +237,7 @@ int main(int argc, char **argv) int c = 0, li = 0, ret = 0; struct option opts[] = { {"addr", required_argument, 0, 'a'}, + {"fd", required_argument, 0, 'S'}, {"config", required_argument, 0, 'c'}, {"keyfile",required_argument, 0, 'k'}, {"forks",required_argument, 0, 'f'}, @@ -243,12 +247,15 @@ int main(int argc, char **argv) {"help", no_argument, 0, 'h'}, {0, 0, 0, 0} }; - while ((c = getopt_long(argc, argv, "a:c:f:k:vqVh", opts, &li)) != -1) { + while ((c = getopt_long(argc, argv, "a:S:c:f:k:vqVh", opts, &li)) != -1) { switch (c) { case 'a': array_push(addr_set, optarg); break; + case 'S': + array_push(fd_set, atoi(optarg)); + break; case 'c': config = optarg; break; @@ -372,13 +379,21 @@ int main(int argc, char **argv) kr_log_error("[system] not enough memory\n"); return EXIT_FAILURE; } + /* Bind to passed fds and run */ + for (size_t i = 0; i < fd_set.len; ++i) { + ret = network_listen_fd(&engine.net, fd_set.at[i]); + if (ret != 0) { + kr_log_error("[system] listen on fd=%d %s\n", fd_set.at[i], kr_strerror(ret)); + ret = EXIT_FAILURE; + } + } /* Bind to sockets and run */ for (size_t i = 0; i < addr_set.len; ++i) { int port = 53; const char *addr = set_addr(addr_set.at[i], &port); ret = network_listen(&engine.net, addr, (uint16_t)port, NET_UDP|NET_TCP); if (ret != 0) { - kr_log_error("[system] bind to '%s#%d' %s\n", addr, port, knot_strerror(ret)); + kr_log_error("[system] bind to '%s#%d' %s\n", addr, port, kr_strerror(ret)); ret = EXIT_FAILURE; } } diff --git a/daemon/network.c b/daemon/network.c index ace172595..334edcf55 100644 --- a/daemon/network.c +++ b/daemon/network.c @@ -15,6 +15,7 @@ */ #include +#include #include "daemon/network.h" #include "daemon/worker.h" #include "daemon/io.h" @@ -153,6 +154,44 @@ static int open_endpoint(struct network *net, struct endpoint *ep, struct sockad return kr_ok(); } +/** Open fd as endpoint. */ +static int open_endpoint_fd(struct network *net, struct endpoint *ep, int fd, int sock_type) +{ + if (sock_type == SOCK_DGRAM) { + if (ep->udp) { + return kr_error(EEXIST); + } + ep->udp = malloc(sizeof(*ep->udp)); + if (!ep->udp) { + return kr_error(ENOMEM); + } + uv_udp_init(net->loop, ep->udp); + int ret = uv_udp_open(ep->udp, (uv_os_sock_t) fd); + if (ret != 0) { + close_handle((uv_handle_t *)ep->udp, false); + return ret; + } + ep->flags |= NET_UDP; + } + if (sock_type == SOCK_STREAM) { + if (ep->tcp) { + return kr_error(EEXIST); + } + ep->tcp = malloc(sizeof(*ep->tcp)); + if (!ep->tcp) { + return kr_error(ENOMEM); + } + uv_tcp_init(net->loop, ep->tcp); + int ret = uv_tcp_open(ep->tcp, (uv_os_sock_t) fd); + if (ret != 0) { + close_handle((uv_handle_t *)ep->tcp, false); + return ret; + } + ep->flags |= NET_TCP; + } + return kr_ok(); +} + /** @internal Fetch endpoint array and offset of the address/port query. */ static endpoint_array_t *network_get(struct network *net, const char *addr, uint16_t port, size_t *index) { @@ -169,6 +208,56 @@ static endpoint_array_t *network_get(struct network *net, const char *addr, uint return NULL; } +int network_listen_fd(struct network *net, int fd) +{ + /* Extract local address and socket type. */ + int sock_type = SOCK_DGRAM; + socklen_t len = sizeof(sock_type); + int ret = getsockopt(fd, SOL_SOCKET, SO_TYPE, &sock_type, &len); + if (ret != 0) { + return kr_error(EBADF); + } + /* Extract local address for this socket. */ + struct sockaddr_storage ss; + socklen_t addr_len = sizeof(ss); + ret = getsockname(fd, (struct sockaddr *)&ss, &addr_len); + if (ret != 0) { + return kr_error(EBADF); + } + int port = 0; + char addr_str[INET6_ADDRSTRLEN]; /* http://tools.ietf.org/html/rfc4291 */ + if (ss.ss_family == AF_INET) { + uv_ip4_name((const struct sockaddr_in*)&ss, addr_str, sizeof(addr_str)); + port = ntohs(((struct sockaddr_in *)&ss)->sin_port); + } else if (ss.ss_family == AF_INET6) { + uv_ip6_name((const struct sockaddr_in6*)&ss, addr_str, sizeof(addr_str)); + port = ntohs(((struct sockaddr_in6 *)&ss)->sin6_port); + } else { + uv_ip4_name((const struct sockaddr_in*)&ss, addr_str, sizeof(addr_str)); + port = ntohs(((struct sockaddr_in *)&ss)->sin_port); + return kr_error(EAFNOSUPPORT); + } + /* Fetch or create endpoint for this fd */ + size_t index = 0; + endpoint_array_t *ep_array = network_get(net, addr_str, port, &index); + if (!ep_array) { + struct endpoint *ep = malloc(sizeof(*ep)); + memset(ep, 0, sizeof(*ep)); + ep->flags = NET_DOWN; + ep->port = port; + ret = insert_endpoint(net, addr_str, ep); + if (ret != 0) { + return ret; + } + ep_array = network_get(net, addr_str, port, &index); + } + /* Open fd in found/created endpoint. */ + struct endpoint *ep = ep_array->at[index]; + assert(ep != NULL); + /* Create a libuv struct for this socket. */ + return open_endpoint_fd(net, ep, fd, sock_type); +} + int network_listen(struct network *net, const char *addr, uint16_t port, uint32_t flags) { if (net == NULL || addr == 0 || port == 0) { diff --git a/daemon/network.h b/daemon/network.h index 84797e8dc..77db270de 100644 --- a/daemon/network.h +++ b/daemon/network.h @@ -45,5 +45,6 @@ struct network { void network_init(struct network *net, uv_loop_t *loop); void network_deinit(struct network *net); +int network_listen_fd(struct network *net, int fd); int network_listen(struct network *net, const char *addr, uint16_t port, uint32_t flags); int network_close(struct network *net, const char *addr, uint16_t port); diff --git a/scripts/supervisor.py b/scripts/supervisor.py new file mode 100644 index 000000000..781881f98 --- /dev/null +++ b/scripts/supervisor.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# +# This is an example of simple supervisor process owning bound sockets and +# handing them over to supervised process, allowing for graceful restarts. +# +import time, datetime +import socket +import os, sys + +# Help +def help(): + print('Usage: %s addr@port ...' % sys.argv[0]) + print('Example: python scripts/supervisor.py ./daemon/kresd 127.0.0.1') + sys.exit(1) +if len(sys.argv) < 3: + help() +# Bind to sockets +daemon = sys.argv[1] +sockets = [] +for addr in sys.argv[2:]: + try: + if '@' in addr: + addr, port = addr.split('@') + port = int(port) + else: + port = 53 + except: help() + # Open TCP socket + tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + tcp.bind((addr, port)) + tcp.listen(5) + sockets.append(tcp) + # Open UDP socket + udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + udp.bind((addr, port)) + sockets.append(udp) +while True: # Fork forever + pid = os.fork() + if pid == 0: + args = ['kresd'] + ['--fd=%d' % s.fileno() for s in sockets] + os.execv('./daemon/kresd', args) + else: # Wait for fork to die + start = datetime.datetime.now() + _, status = os.waitpid(pid, 0) + end = datetime.datetime.now() + print('[%s] process finished, pid = %d, status = %d, uptime = %s' % \ + (start, pid, status, end - start)) + time.sleep(0.5) \ No newline at end of file