]> git.ipfire.org Git - thirdparty/FORT-validator.git/commitdiff
Add docs for thread-pool.* args, add flow to reject RTR clients
authorpcarana <pc.moreno2099@gmail.com>
Wed, 25 Nov 2020 23:28:40 +0000 (17:28 -0600)
committerpcarana <pc.moreno2099@gmail.com>
Wed, 25 Nov 2020 23:28:40 +0000 (17:28 -0600)
+An RTR client is rejected when there aren't available threads at the pool to attend it.
+Add new function at thread_pool.c to check if the pool has available threads to work.
+Use an internal buffer at sockaddr2str(), since the buffer received as parameter wasn't utilized by anybody.
+Update max values: thread-pool.server.max=500, thread-pool.validation.max=100.

docs/usage.md
man/fort.8
src/address.c
src/address.h
src/config.c
src/rtr/pdu.c
src/rtr/rtr.c
src/thread/thread_pool.c
src/thread/thread_pool.h

index 89f4f41f345759565bfeb443fda99f0ad0369ff8..2b286f298a3b5c6d3f69d3fd9336d7376cabd6bb 100644 (file)
@@ -56,20 +56,22 @@ description: Guide to use arguments of FORT Validator.
        44. [`--output.bgpsec`](#--outputbgpsec)
        45. [`--asn1-decode-max-stack`](#--asn1-decode-max-stack)
        46. [`--stale-repository-period`](#--stale-repository-period)
-       47. [`--rsync.enabled`](#--rsyncenabled)
-       48. [`--rsync.priority`](#--rsyncpriority)
-       49. [`--rsync.strategy`](#--rsyncstrategy)
+       47. [`--thread-pool.server.max`](#--thread-poolservermax)
+       48. [`--thread-pool.validation.max`](#--thread-poolvalidationmax)
+       49. [`--rsync.enabled`](#--rsyncenabled)
+       50. [`--rsync.priority`](#--rsyncpriority)
+       51. [`--rsync.strategy`](#--rsyncstrategy)
                1. [`strict`](#strict)
                2. [`root`](#root)
                3. [`root-except-ta`](#root-except-ta)
-       50. [`--rsync.retry.count`](#--rsyncretrycount)
-       51. [`--rsync.retry.interval`](#--rsyncretryinterval)
-       52. [`--configuration-file`](#--configuration-file)
-       53. [`rsync.program`](#rsyncprogram)
-       54. [`rsync.arguments-recursive`](#rsyncarguments-recursive)
-       55. [`rsync.arguments-flat`](#rsyncarguments-flat)
-       56. [`incidences`](#incidences)
-       57. [`init-locations`](#init-locations)
+       52. [`--rsync.retry.count`](#--rsyncretrycount)
+       53. [`--rsync.retry.interval`](#--rsyncretryinterval)
+       54. [`--configuration-file`](#--configuration-file)
+       55. [`rsync.program`](#rsyncprogram)
+       56. [`rsync.arguments-recursive`](#rsyncarguments-recursive)
+       57. [`rsync.arguments-flat`](#rsyncarguments-flat)
+       58. [`incidences`](#incidences)
+       59. [`init-locations`](#init-locations)
 3. [Deprecated arguments](#deprecated-arguments)
        1. [`--sync-strategy`](#--sync-strategy)
        2. [`--rrdp.enabled`](#--rrdpenabled)
@@ -138,6 +140,8 @@ description: Guide to use arguments of FORT Validator.
         [--http.ca-path=<directory>]
         [--output.roa=<file>]
         [--output.bgpsec=<file>]
+        [--thread-pool.server.max=<unsigned integer>]
+        [--thread-pool.validation.max=<unsigned integer>]
 ```
 
 If an argument is declared more than once, the last one takes precedence:
@@ -766,6 +770,32 @@ Currently **all** the communication errors are logged at the validation log. Thi
 
 A value **equal to 0** means that the communication errors will be logged at once.
 
+### `--thread-pool.server.max`
+
+- **Type:** Integer
+- **Availability:** `argv` and JSON
+- **Default:** 20
+- **Range:** 1--500
+
+Maximum number of threads that will be spawned at an internal thread pool to attend incoming RTR clients (i.e. routers).
+
+The thread pool assigns one thread per RTR client, so a maximum of `--thread-pool.server.max` clients will be attended simultaneously. If the max limit is reached, any incoming client will be rejected: an RTR error PDU will be sent to the client and the connection will be closed by the server.
+
+Once the client or the server terminates the session, the corresponding thread will be returned to the pool so that it can be used again by any other incoming client.
+
+### `--thread-pool.validation.max`
+
+- **Type:** Integer
+- **Availability:** `argv` and JSON
+- **Default:** 10
+- **Range:** 1--100
+
+Maximum number of threads that will be spawned at an internal thread pool in order to run validation cycles.
+
+When a validation cycle begins, one thread per configured TAL is utilized; once the whole RPKI tree of the TAL is validated, the thread is returned to the pool.
+
+If there are more TALs at [`--tal`](#--tal) than `--thread-pool.validation.max` threads at the pool, is very likely that the validation cycles take a bit more of time to complete since only `--thread-pool.validation.max` threads will be working at the same time. E.g. if `--thread-pool.validation.max=2` and the location at [`--tal`](#--tal) has 4 TAL files, only 2 TALs will be validated simultaneously while the rest waits in a queue until there's an available thread at the pool to attend them.
+
 ### `--rsync.enabled`
 
 - **Type:** Boolean (`true`, `false`)
@@ -989,6 +1019,15 @@ The configuration options are mostly the same as the ones from the `argv` interf
                "<a href="#--outputbgpsec">bgpsec</a>": "/tmp/fort/bgpsec.csv"
        },
 
+       "thread-pool": {
+               "server": {
+                       "<a href="#--thread-poolservermax">max</a>": 20
+               },
+               "validation": {
+                       "<a href="#--thread-poolvalidationmax">max</a>": 10
+               }
+       }
+
        "<a href="#--asn1-decode-max-stack">asn1-decode-max-stack</a>": 4096,
        "<a href="#--stale-repository-period">stale-repository-period</a>": 43200
 }
index 4137dde06abf45f53be8e008dad2b737f328beb6..4cb1a8b83ceed4983ce289393816fbe3762071d6 100644 (file)
@@ -1,4 +1,4 @@
-.TH fort 8 "2020-09-29" "v1.5.0" "FORT validator"
+.TH fort 8 "2020-11-25" "v1.5.0" "FORT validator"
 
 .SH NAME
 fort \- RPKI certificate path validator and RTR server
@@ -1116,6 +1116,44 @@ value, eg.
 .B \-\-output.bgpsec=-
 .RE
 
+.B \-\-thread-pool.server.max=\fIUNSIGNED_INTEGER\fR
+.RS 4
+Maximum number of threads that will be spawned at an internal thread pool to
+attend incoming RTR clients (i.e. routers).
+.P
+The thread pool assigns one thread per RTR client, so a maximum of
+\fI--thread-pool.server.max\fR clients will be attended simultaneously. If the
+max limit is reached, any incoming client will be rejected: an RTR error PDU
+will be sent to the client and the connection will be closed by the server.
+.P
+Once the client or the server terminates the session, the corresponding thread
+will be returned to the pool so that it can be used again by any other incoming
+client.
+.P
+By default, it has a value of \fI20\fR. Minimum allowed value: \fI1\fR,
+maximum allowed value \fI500\fR.
+.RE
+
+.B \-\-thread-pool.validation.max=\fIUNSIGNED_INTEGER\fR
+.RS 4
+Maximum number of threads that will be spawned at an internal thread pool in
+order to run validation cycles.
+.P
+When a validation cycle begins, one thread per configured TAL is utilized; once
+the whole RPKI tree of the TAL is validated, the thread is returned to the pool.
+.P
+If there are more TALs at \fI--tal\fR than \fI--thread-pool.validation.max\fR
+threads at the pool, is very likely that the validation cycles take a bit more
+of time to complete since only \fI--thread-pool.validation.max\fR threads will
+be working at the same time. E.g. if \fI--thread-pool.validation.max=2\fR and
+the location at \fI--tal\fR has 4 TAL files, only 2 TALs will be validated
+simultaneously while the rest waits in a queue until there's an available thread
+at the pool to attend them.
+.P
+By default, it has a value of \fI10\fR. Minimum allowed value: \fI1\fR,
+maximum allowed value \fI100\fR.
+.RE
+
 .B \-\-asn1-decode-max-stack=\fIUNSIGNED_INTEGER\fR
 .RS 4
 ASN1 decoder max allowed stack size in bytes, utilized to avoid a stack
@@ -1320,6 +1358,14 @@ to a specific value:
     "roa": "/tmp/fort/roas.csv",
     "bgpsec": "/tmp/fort/bgpsec.csv"
   },
+  "thread-pool": {
+    "server": {
+      "max": 20
+    },
+    "validation": {
+      "max": 10
+    }
+  },
   "asn1-decode-max-stack": 4096,
   "stale-repository-period": 43200
 }
index df69bbf311d583562d9f690af025a30e2369eaab..acf96a624b86a4b11fb2e512d73dd688ed280512 100644 (file)
@@ -519,10 +519,11 @@ ipv6_covered(struct in6_addr *f_addr, uint8_t f_len, struct in6_addr *son_addr)
  * buffer must length INET6_ADDRSTRLEN.
  */
 char const *
-sockaddr2str(struct sockaddr_storage *sockaddr, char *buffer)
+sockaddr2str(struct sockaddr_storage *sockaddr)
 {
        void *addr = NULL;
        char const *addr_str;
+       char buffer[INET6_ADDRSTRLEN];
 
        if (sockaddr == NULL)
                return "(null)";
index 11670bf94d1ff3745240703c215f12e7d86a648b..a69b83087650f16e9acc35993ff4119f22cd35fd 100644 (file)
@@ -51,6 +51,6 @@ int ipv6_prefix_validate(struct ipv6_prefix *);
 bool ipv4_covered(struct in_addr *, uint8_t, struct in_addr *);
 bool ipv6_covered(struct in6_addr *, uint8_t, struct in6_addr *);
 
-char const *sockaddr2str(struct sockaddr_storage *, char *);
+char const *sockaddr2str(struct sockaddr_storage *);
 
 #endif /* SRC_ADDRESS_H_ */
index 23e90b6f781cb197e426f94766581657b85b764a..b6a0dd3978aad205f94c79cb4715efe717976341 100644 (file)
@@ -772,8 +772,8 @@ static const struct option_field options[] = {
                .offset = offsetof(struct rpki_config, thread_pool.server.max),
                .doc = "Maximum number of active threads (one thread per RTR client) that can live at the thread pool",
                .min = 1,
-               /* Would somebody connect more than 400 routers? */
-               .max = 400,
+               /* Would somebody connect more than 500 routers? */
+               .max = 500,
        },
        {
                .id = 12001,
@@ -783,7 +783,7 @@ static const struct option_field options[] = {
                    thread_pool.validation.max),
                .doc = "Maximum number of active threads (one thread per TAL) that can live at the thread pool",
                .min = 1,
-               .max = 20,
+               .max = 100,
        },
 
        { 0 },
index f6e4c7de1849656a15f4e2f12206fef809513f43..37515c9a38d3a63ee6fc22343bccd01fb8cebdc3 100644 (file)
@@ -126,12 +126,10 @@ pdu_load(int fd, struct sockaddr_storage *client_addr,
                return error;
 
 
-       if (log_op_debug_enabled()) {
-               char buffer[INET6_ADDRSTRLEN];
+       if (log_op_debug_enabled())
                pr_op_debug("Received a %s from %s.",
                    pdutype2str(header.pdu_type),
-                   sockaddr2str(client_addr, buffer));
-       }
+                   sockaddr2str(client_addr));
 
        error = validate_rtr_version(fd, &header, hdr_bytes);
        if (error)
index 105cc4e8d9f7a74610ca623689b563965ccc57e2..24d8fda5bf3850ed6068247c216322ecdf20b1b7 100644 (file)
@@ -26,6 +26,7 @@
 #define CL_ACCEPTED   "accepted"
 #define CL_CLOSED     "closed"
 #define CL_TERMINATED "terminated"
+#define CL_REJECTED   "rejected"
 
 /* Parameters for each thread that handles client connections */
 struct thread_param {
@@ -393,36 +394,50 @@ clean_request(struct rtr_request *request, const struct pdu_metadata *meta)
 static int
 print_close_failure(int error, struct sockaddr_storage *sockaddr)
 {
-       char buffer[INET6_ADDRSTRLEN];
-       char const *addr_str;
-
-       addr_str = sockaddr2str(sockaddr, buffer);
-
        return pr_op_errno(error, "close() failed on socket of client %s",
-           addr_str);
-}
-
-static void
-print_client_addr(struct sockaddr_storage *addr, char const *action, int fd)
-{
-       char buffer[INET6_ADDRSTRLEN];
-       pr_op_info("Client %s [ID %d]: %s", action, fd,
-           sockaddr2str(addr, buffer));
+           sockaddr2str(sockaddr));
 }
 
 static int
 end_client(struct client *client, void *arg)
 {
-       if (arg != NULL && strcmp(arg, CL_TERMINATED) == 0)
+       char const *action = arg;
+       bool rejected;
+
+       /* When we (server) are closing the connection */
+       rejected = (strcmp(action, CL_REJECTED) == 0);
+       if (arg != NULL && (strcmp(arg, CL_TERMINATED) == 0 || rejected))
                shutdown(client->fd, SHUT_RDWR);
 
        if (close(client->fd) != 0)
                return print_close_failure(errno, &client->addr);
 
-       print_client_addr(&(client->addr), arg, client->fd);
+       if (rejected) {
+               pr_op_warn("Client %s [ID %d]: %s", action, client->fd,
+                   sockaddr2str(&client->addr));
+               pr_op_warn("Use a greater value at 'thread-pool.server.max' if you wish to accept more than %u clients.",
+                   config_get_thread_pool_server_max());
+               return 0;
+       }
+
+       pr_op_info("Client %s [ID %d]: %s", action, client->fd,
+           sockaddr2str(&client->addr));
        return 0;
 }
 
+static void
+reject_client(int fd, struct sockaddr_storage *addr)
+{
+       struct client client;
+
+       client.fd = fd;
+       client.addr = *addr;
+
+       /* Try to be polite notifying there was an error */
+       err_pdu_send_internal_error(fd, RTR_V0);
+       end_client(&client, CL_REJECTED);
+}
+
 /*
  * The client socket threads' entry routine.
  * @arg must be released.
@@ -535,7 +550,19 @@ handle_client_connections(void *arg)
                                return NULL;
                        }
 
-                       print_client_addr(&client_addr, CL_ACCEPTED, client_fd);
+                       /*
+                        * It's very likely that the clients won't release their
+                        * sessions once established; so, don't let any new
+                        * client at the thread pool queue since it's probable
+                        * that it'll remain there forever.
+                        */
+                       if (!thread_pool_avail_threads(pool)) {
+                               reject_client(client_fd, &client_addr);
+                               continue;
+                       }
+
+                       pr_op_info("Client %s [ID %d]: %s", CL_ACCEPTED,
+                           client_fd, sockaddr2str(&client_addr));
 
                        /*
                         * Note: My gut says that errors from now on (even the
index 908e87bc9c6ba05b21ce496025ba3a5eb5dd8fd3..1612237b8dd3594945539eeb2663ba6dd2339b95 100644 (file)
@@ -2,7 +2,6 @@
 
 #include <sys/queue.h>
 #include <pthread.h>
-#include <stdbool.h>
 #include <stdlib.h>
 #include <string.h>
 #include "log.h"
@@ -335,6 +334,19 @@ thread_pool_push(struct thread_pool *pool, thread_pool_task_cb cb, void *arg)
        return 0;
 }
 
+/* Are there available threads to work? */
+bool
+thread_pool_avail_threads(struct thread_pool *pool)
+{
+       bool result;
+
+       thread_pool_lock(pool);
+       result = (pool->working_count < pool->thread_count);
+       thread_pool_unlock(pool);
+
+       return result;
+}
+
 /* Waits for all pending tasks at @poll to end */
 void
 thread_pool_wait(struct thread_pool *pool)
index 9a17813aba267ebebaf2b15a68de0dda047b2c6c..4a75cf1388b2d588a3163b48c99d0ce286a980b1 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef SRC_THREAD_THREAD_POOL_H_
 #define SRC_THREAD_THREAD_POOL_H_
 
+#include <stdbool.h>
+
 /* Thread pool base struct */
 struct thread_pool;
 
@@ -10,6 +12,7 @@ void thread_pool_destroy(struct thread_pool *);
 typedef void *(*thread_pool_task_cb)(void *);
 int thread_pool_push(struct thread_pool *, thread_pool_task_cb, void *);
 
+bool thread_pool_avail_threads(struct thread_pool *);
 void thread_pool_wait(struct thread_pool *);
 
 #endif /* SRC_THREAD_THREAD_POOL_H_ */