+An RTR client is rejected when there aren't available threads at the pool to attend it.
+Add new function at thread_pool.c to check if the pool has available threads to work.
+Use an internal buffer at sockaddr2str(), since the buffer received as parameter wasn't utilized by anybody.
+Update max values: thread-pool.server.max=500, thread-pool.validation.max=100.
44. [`--output.bgpsec`](#--outputbgpsec)
45. [`--asn1-decode-max-stack`](#--asn1-decode-max-stack)
46. [`--stale-repository-period`](#--stale-repository-period)
- 47. [`--rsync.enabled`](#--rsyncenabled)
- 48. [`--rsync.priority`](#--rsyncpriority)
- 49. [`--rsync.strategy`](#--rsyncstrategy)
+ 47. [`--thread-pool.server.max`](#--thread-poolservermax)
+ 48. [`--thread-pool.validation.max`](#--thread-poolvalidationmax)
+ 49. [`--rsync.enabled`](#--rsyncenabled)
+ 50. [`--rsync.priority`](#--rsyncpriority)
+ 51. [`--rsync.strategy`](#--rsyncstrategy)
1. [`strict`](#strict)
2. [`root`](#root)
3. [`root-except-ta`](#root-except-ta)
- 50. [`--rsync.retry.count`](#--rsyncretrycount)
- 51. [`--rsync.retry.interval`](#--rsyncretryinterval)
- 52. [`--configuration-file`](#--configuration-file)
- 53. [`rsync.program`](#rsyncprogram)
- 54. [`rsync.arguments-recursive`](#rsyncarguments-recursive)
- 55. [`rsync.arguments-flat`](#rsyncarguments-flat)
- 56. [`incidences`](#incidences)
- 57. [`init-locations`](#init-locations)
+ 52. [`--rsync.retry.count`](#--rsyncretrycount)
+ 53. [`--rsync.retry.interval`](#--rsyncretryinterval)
+ 54. [`--configuration-file`](#--configuration-file)
+ 55. [`rsync.program`](#rsyncprogram)
+ 56. [`rsync.arguments-recursive`](#rsyncarguments-recursive)
+ 57. [`rsync.arguments-flat`](#rsyncarguments-flat)
+ 58. [`incidences`](#incidences)
+ 59. [`init-locations`](#init-locations)
3. [Deprecated arguments](#deprecated-arguments)
1. [`--sync-strategy`](#--sync-strategy)
2. [`--rrdp.enabled`](#--rrdpenabled)
[--http.ca-path=<directory>]
[--output.roa=<file>]
[--output.bgpsec=<file>]
+ [--thread-pool.server.max=<unsigned integer>]
+ [--thread-pool.validation.max=<unsigned integer>]
```
If an argument is declared more than once, the last one takes precedence:
A value **equal to 0** means that the communication errors will be logged at once.
+### `--thread-pool.server.max`
+
+- **Type:** Integer
+- **Availability:** `argv` and JSON
+- **Default:** 20
+- **Range:** 1--500
+
+Maximum number of threads that will be spawned at an internal thread pool to attend incoming RTR clients (i.e. routers).
+
+The thread pool assigns one thread per RTR client, so a maximum of `--thread-pool.server.max` clients will be attended simultaneously. If the max limit is reached, any incoming client will be rejected: an RTR error PDU will be sent to the client and the connection will be closed by the server.
+
+Once the client or the server terminates the session, the corresponding thread will be returned to the pool so that it can be used again by any other incoming client.
+
+### `--thread-pool.validation.max`
+
+- **Type:** Integer
+- **Availability:** `argv` and JSON
+- **Default:** 10
+- **Range:** 1--100
+
+Maximum number of threads that will be spawned at an internal thread pool in order to run validation cycles.
+
+When a validation cycle begins, one thread per configured TAL is utilized; once the whole RPKI tree of the TAL is validated, the thread is returned to the pool.
+
+If there are more TALs at [`--tal`](#--tal) than `--thread-pool.validation.max` threads at the pool, is very likely that the validation cycles take a bit more of time to complete since only `--thread-pool.validation.max` threads will be working at the same time. E.g. if `--thread-pool.validation.max=2` and the location at [`--tal`](#--tal) has 4 TAL files, only 2 TALs will be validated simultaneously while the rest waits in a queue until there's an available thread at the pool to attend them.
+
### `--rsync.enabled`
- **Type:** Boolean (`true`, `false`)
"<a href="#--outputbgpsec">bgpsec</a>": "/tmp/fort/bgpsec.csv"
},
+ "thread-pool": {
+ "server": {
+ "<a href="#--thread-poolservermax">max</a>": 20
+ },
+ "validation": {
+ "<a href="#--thread-poolvalidationmax">max</a>": 10
+ }
+ }
+
"<a href="#--asn1-decode-max-stack">asn1-decode-max-stack</a>": 4096,
"<a href="#--stale-repository-period">stale-repository-period</a>": 43200
}
-.TH fort 8 "2020-09-29" "v1.5.0" "FORT validator"
+.TH fort 8 "2020-11-25" "v1.5.0" "FORT validator"
.SH NAME
fort \- RPKI certificate path validator and RTR server
.B \-\-output.bgpsec=-
.RE
+.B \-\-thread-pool.server.max=\fIUNSIGNED_INTEGER\fR
+.RS 4
+Maximum number of threads that will be spawned at an internal thread pool to
+attend incoming RTR clients (i.e. routers).
+.P
+The thread pool assigns one thread per RTR client, so a maximum of
+\fI--thread-pool.server.max\fR clients will be attended simultaneously. If the
+max limit is reached, any incoming client will be rejected: an RTR error PDU
+will be sent to the client and the connection will be closed by the server.
+.P
+Once the client or the server terminates the session, the corresponding thread
+will be returned to the pool so that it can be used again by any other incoming
+client.
+.P
+By default, it has a value of \fI20\fR. Minimum allowed value: \fI1\fR,
+maximum allowed value \fI500\fR.
+.RE
+
+.B \-\-thread-pool.validation.max=\fIUNSIGNED_INTEGER\fR
+.RS 4
+Maximum number of threads that will be spawned at an internal thread pool in
+order to run validation cycles.
+.P
+When a validation cycle begins, one thread per configured TAL is utilized; once
+the whole RPKI tree of the TAL is validated, the thread is returned to the pool.
+.P
+If there are more TALs at \fI--tal\fR than \fI--thread-pool.validation.max\fR
+threads at the pool, is very likely that the validation cycles take a bit more
+of time to complete since only \fI--thread-pool.validation.max\fR threads will
+be working at the same time. E.g. if \fI--thread-pool.validation.max=2\fR and
+the location at \fI--tal\fR has 4 TAL files, only 2 TALs will be validated
+simultaneously while the rest waits in a queue until there's an available thread
+at the pool to attend them.
+.P
+By default, it has a value of \fI10\fR. Minimum allowed value: \fI1\fR,
+maximum allowed value \fI100\fR.
+.RE
+
.B \-\-asn1-decode-max-stack=\fIUNSIGNED_INTEGER\fR
.RS 4
ASN1 decoder max allowed stack size in bytes, utilized to avoid a stack
"roa": "/tmp/fort/roas.csv",
"bgpsec": "/tmp/fort/bgpsec.csv"
},
+ "thread-pool": {
+ "server": {
+ "max": 20
+ },
+ "validation": {
+ "max": 10
+ }
+ },
"asn1-decode-max-stack": 4096,
"stale-repository-period": 43200
}
* buffer must length INET6_ADDRSTRLEN.
*/
char const *
-sockaddr2str(struct sockaddr_storage *sockaddr, char *buffer)
+sockaddr2str(struct sockaddr_storage *sockaddr)
{
void *addr = NULL;
char const *addr_str;
+ char buffer[INET6_ADDRSTRLEN];
if (sockaddr == NULL)
return "(null)";
bool ipv4_covered(struct in_addr *, uint8_t, struct in_addr *);
bool ipv6_covered(struct in6_addr *, uint8_t, struct in6_addr *);
-char const *sockaddr2str(struct sockaddr_storage *, char *);
+char const *sockaddr2str(struct sockaddr_storage *);
#endif /* SRC_ADDRESS_H_ */
.offset = offsetof(struct rpki_config, thread_pool.server.max),
.doc = "Maximum number of active threads (one thread per RTR client) that can live at the thread pool",
.min = 1,
- /* Would somebody connect more than 400 routers? */
- .max = 400,
+ /* Would somebody connect more than 500 routers? */
+ .max = 500,
},
{
.id = 12001,
thread_pool.validation.max),
.doc = "Maximum number of active threads (one thread per TAL) that can live at the thread pool",
.min = 1,
- .max = 20,
+ .max = 100,
},
{ 0 },
return error;
- if (log_op_debug_enabled()) {
- char buffer[INET6_ADDRSTRLEN];
+ if (log_op_debug_enabled())
pr_op_debug("Received a %s from %s.",
pdutype2str(header.pdu_type),
- sockaddr2str(client_addr, buffer));
- }
+ sockaddr2str(client_addr));
error = validate_rtr_version(fd, &header, hdr_bytes);
if (error)
#define CL_ACCEPTED "accepted"
#define CL_CLOSED "closed"
#define CL_TERMINATED "terminated"
+#define CL_REJECTED "rejected"
/* Parameters for each thread that handles client connections */
struct thread_param {
static int
print_close_failure(int error, struct sockaddr_storage *sockaddr)
{
- char buffer[INET6_ADDRSTRLEN];
- char const *addr_str;
-
- addr_str = sockaddr2str(sockaddr, buffer);
-
return pr_op_errno(error, "close() failed on socket of client %s",
- addr_str);
-}
-
-static void
-print_client_addr(struct sockaddr_storage *addr, char const *action, int fd)
-{
- char buffer[INET6_ADDRSTRLEN];
- pr_op_info("Client %s [ID %d]: %s", action, fd,
- sockaddr2str(addr, buffer));
+ sockaddr2str(sockaddr));
}
static int
end_client(struct client *client, void *arg)
{
- if (arg != NULL && strcmp(arg, CL_TERMINATED) == 0)
+ char const *action = arg;
+ bool rejected;
+
+ /* When we (server) are closing the connection */
+ rejected = (strcmp(action, CL_REJECTED) == 0);
+ if (arg != NULL && (strcmp(arg, CL_TERMINATED) == 0 || rejected))
shutdown(client->fd, SHUT_RDWR);
if (close(client->fd) != 0)
return print_close_failure(errno, &client->addr);
- print_client_addr(&(client->addr), arg, client->fd);
+ if (rejected) {
+ pr_op_warn("Client %s [ID %d]: %s", action, client->fd,
+ sockaddr2str(&client->addr));
+ pr_op_warn("Use a greater value at 'thread-pool.server.max' if you wish to accept more than %u clients.",
+ config_get_thread_pool_server_max());
+ return 0;
+ }
+
+ pr_op_info("Client %s [ID %d]: %s", action, client->fd,
+ sockaddr2str(&client->addr));
return 0;
}
+static void
+reject_client(int fd, struct sockaddr_storage *addr)
+{
+ struct client client;
+
+ client.fd = fd;
+ client.addr = *addr;
+
+ /* Try to be polite notifying there was an error */
+ err_pdu_send_internal_error(fd, RTR_V0);
+ end_client(&client, CL_REJECTED);
+}
+
/*
* The client socket threads' entry routine.
* @arg must be released.
return NULL;
}
- print_client_addr(&client_addr, CL_ACCEPTED, client_fd);
+ /*
+ * It's very likely that the clients won't release their
+ * sessions once established; so, don't let any new
+ * client at the thread pool queue since it's probable
+ * that it'll remain there forever.
+ */
+ if (!thread_pool_avail_threads(pool)) {
+ reject_client(client_fd, &client_addr);
+ continue;
+ }
+
+ pr_op_info("Client %s [ID %d]: %s", CL_ACCEPTED,
+ client_fd, sockaddr2str(&client_addr));
/*
* Note: My gut says that errors from now on (even the
#include <sys/queue.h>
#include <pthread.h>
-#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include "log.h"
return 0;
}
+/* Are there available threads to work? */
+bool
+thread_pool_avail_threads(struct thread_pool *pool)
+{
+ bool result;
+
+ thread_pool_lock(pool);
+ result = (pool->working_count < pool->thread_count);
+ thread_pool_unlock(pool);
+
+ return result;
+}
+
/* Waits for all pending tasks at @poll to end */
void
thread_pool_wait(struct thread_pool *pool)
#ifndef SRC_THREAD_THREAD_POOL_H_
#define SRC_THREAD_THREAD_POOL_H_
+#include <stdbool.h>
+
/* Thread pool base struct */
struct thread_pool;
typedef void *(*thread_pool_task_cb)(void *);
int thread_pool_push(struct thread_pool *, thread_pool_task_cb, void *);
+bool thread_pool_avail_threads(struct thread_pool *);
void thread_pool_wait(struct thread_pool *);
#endif /* SRC_THREAD_THREAD_POOL_H_ */