+Change the previous logic: RTR server lived at the main thread and the validation cycles were run in a distinct thread. Now the validation cycles are run at the main thread, and RTR server is spawned at a new thread.
+Create internal thread pool to handle RTR server task and delete RRDP dirs tasks.
+Create thread pool to handle incoming RTR clients. One thread is utilized per client.
+Create args: 'thread-pool.server.max' (spawned threads to attend RTR clients) and 'thread-pool.validation.max' (spawned threads to run validation cycles).
+Shutdown all living client sockets when the application ends its execution.
+Rename 'updates_daemon.*' to 'validation_run.*'.
fort_SOURCES += extension.h extension.c
fort_SOURCES += file.h file.c
fort_SOURCES += init.h init.c
+fort_SOURCES += internal_pool.h internal_pool.c
fort_SOURCES += json_parser.c json_parser.h
fort_SOURCES += line_file.h line_file.c
fort_SOURCES += log.h log.c
fort_SOURCES += state.h state.c
fort_SOURCES += str_token.h str_token.c
fort_SOURCES += thread_var.h thread_var.c
-fort_SOURCES += updates_daemon.c updates_daemon.h
fort_SOURCES += uri.h uri.c
fort_SOURCES += json_handler.h json_handler.c
fort_SOURCES += validation_handler.h validation_handler.c
+fort_SOURCES += validation_run.h validation_run.c
fort_SOURCES += visited_uris.h visited_uris.c
fort_SOURCES += asn1/content_info.h asn1/content_info.c
}
static struct hashable_client *
-create_client(int fd, struct sockaddr_storage addr, pthread_t tid)
+create_client(int fd, struct sockaddr_storage addr)
{
struct hashable_client *client;
client->meat.serial_number_set = false;
client->meat.rtr_version_set = false;
client->meat.addr = addr;
- client->meat.tid = tid;
return client;
}
* If the client whose file descriptor is @fd isn't already stored, store it.
*/
int
-clients_add(int fd, struct sockaddr_storage addr, pthread_t tid)
+clients_add(int fd, struct sockaddr_storage addr)
{
struct hashable_client *new_client;
struct hashable_client *old_client;
- new_client = create_client(fd, addr, tid);
+ new_client = create_client(fd, addr);
if (new_client == NULL)
return pr_enomem();
return 0;
}
-int
-clients_get_addr(int fd, struct sockaddr_storage *addr)
-{
- struct hashable_client *client;
- int result;
-
- result = -ENOENT;
- rwlock_read_lock(&lock);
-
- HASH_FIND_INT(db.clients, &fd, client);
- if (client != NULL) {
- *addr = client->meat.addr;
- result = 0;
- }
-
- rwlock_unlock(&lock);
-
- return result;
-}
-
void
clients_update_serial(int fd, serial_t serial)
{
return result;
}
+/*
+ * Remove the client with ID @fd from the DB. If a @cb is set, it will be
+ * called before deleting the client from the DB.
+ */
void
-clients_forget(int fd)
+clients_forget(int fd, clients_foreach_cb cb, void *arg)
{
struct hashable_client *client;
HASH_FIND_INT(db.clients, &fd, client);
if (client != NULL) {
HASH_DEL(db.clients, client);
+ /* Nothing to do at errors */
+ if (cb != NULL)
+ cb(&client->meat, arg);
free(client);
}
}
/*
- * Destroy the clients DB, the @join_thread_cb will be made for each thread
- * that was started by the parent process (@arg will be sent at that call).
+ * Terminate all clients and remove them from DB.
+ *
+ * @cb (with @arg) will be called before deleting the corresponding client from
+ * the DB.
+ */
+int
+clients_terminate_all(clients_foreach_cb cb, void *arg)
+{
+ struct hashable_client *node, *tmp;
+ int error;
+
+ rwlock_write_lock(&lock);
+
+ HASH_ITER(hh, db.clients, node, tmp) {
+ error = cb(&node->meat, arg);
+ if (error)
+ break;
+ HASH_DEL(db.clients, node);
+ free(node);
+ }
+
+ rwlock_unlock(&lock);
+
+ return error;
+}
+
+/*
+ * Destroy the clients DB.
*/
void
-clients_db_destroy(join_thread_cb cb, void *arg)
+clients_db_destroy(void)
{
struct hashable_client *node, *tmp;
HASH_ITER(hh, db.clients, node, tmp) {
- /* Not much to do on failure */
- cb(node->meat.tid, arg);
HASH_DEL(db.clients, node);
free(node);
}
#ifndef SRC_CLIENTS_H_
#define SRC_CLIENTS_H_
-#include <pthread.h>
#include <stdbool.h>
#include <netinet/in.h>
#include "rtr/pdu.h"
struct client {
int fd;
struct sockaddr_storage addr;
- /*
- * The join should be made when the db is cleared, so the main process
- * should do it.
- */
- pthread_t tid;
serial_t serial_number;
bool serial_number_set;
int clients_db_init(void);
-int clients_add(int, struct sockaddr_storage, pthread_t);
+int clients_add(int, struct sockaddr_storage);
void clients_update_serial(int, serial_t);
-void clients_forget(int);
+
typedef int (*clients_foreach_cb)(struct client *, void *);
+void clients_forget(int, clients_foreach_cb, void *);
int clients_foreach(clients_foreach_cb, void *);
+
int clients_get_min_serial(serial_t *);
-int clients_get_addr(int, struct sockaddr_storage *);
int clients_set_rtr_version(int, uint8_t);
int clients_get_rtr_version_set(int, bool *, uint8_t *);
-typedef int (*join_thread_cb)(pthread_t, void *);
-void clients_db_destroy(join_thread_cb, void *);
+int clients_terminate_all(clients_foreach_cb, void *);
+void clients_db_destroy(void);
#endif /* SRC_CLIENTS_H_ */
char *port;
/** Outstanding connections in the socket's listen queue */
unsigned int backlog;
-
struct {
/** Interval used to look for updates at VRPs location */
unsigned int validation;
/* HTTPS URLS from where the TALS will be fetched */
struct init_locations init_tal_locations;
+
+ /* Thread pools for specific tasks */
+ struct {
+ /* Threads related to RTR server */
+ struct {
+ unsigned int max;
+ } server;
+ /* Threads related to validation cycles */
+ struct {
+ unsigned int max;
+ } validation;
+ } thread_pool;
};
static void print_usage(FILE *, bool);
.availability = AVAILABILITY_JSON,
},
+ {
+ .id = 12000,
+ .name = "thread-pool.server.max",
+ .type = >_uint,
+ .offset = offsetof(struct rpki_config, thread_pool.server.max),
+ .doc = "Maximum number of active threads (one thread per RTR client) that can live at the thread pool",
+ .min = 1,
+ /* Would somebody connect more than 400 routers? */
+ .max = 400,
+ },
+ {
+ .id = 12001,
+ .name = "thread-pool.validation.max",
+ .type = >_uint,
+ .offset = offsetof(struct rpki_config,
+ thread_pool.validation.max),
+ .doc = "Maximum number of active threads (one thread per TAL) that can live at the thread pool",
+ .min = 1,
+ .max = 20,
+ },
+
{ 0 },
};
if (error)
goto revert_init_locations;
+ /* Common scenario is to connect 1 router or a couple of them */
+ rpki_config.thread_pool.server.max = 20;
+ /* Usually 5 TALs, let a few more available */
+ rpki_config.thread_pool.validation.max = 10;
+
return 0;
revert_init_locations:
free(rpki_config.validation_log.tag);
return rpki_config.stale_repository_period;
}
+unsigned int
+config_get_thread_pool_server_max(void)
+{
+ return rpki_config.thread_pool.server.max;
+}
+
+unsigned int
+config_get_thread_pool_validation_max(void)
+{
+ return rpki_config.thread_pool.validation.max;
+}
+
void
config_set_rsync_enabled(bool value)
{
char const *config_get_output_bgpsec(void);
unsigned int config_get_asn1_decode_max_stack(void);
unsigned int config_get_stale_repository_period(void);
+unsigned int config_get_thread_pool_server_max(void);
+unsigned int config_get_thread_pool_validation_max(void);
/* Logging getters */
bool config_get_op_log_enabled(void);
#include <sys/stat.h>
#include <errno.h>
#include <ftw.h>
-#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include "common.h"
+#include "internal_pool.h"
#include "log.h"
#include "random.h"
dirs_arr = root_arg->arr;
len = root_arg->arr_set;
- /* Release received arg, and detach thread */
+ /* Release received arg */
free(root_arg);
- pthread_detach(pthread_self());
for (i = 0; i < len; i++) {
error = nftw(dirs_arr[i], traverse, MAX_FD_ALLOWED,
int
delete_dir_daemon_start(char **roots, size_t roots_len, char const *workspace)
{
- pthread_t thread;
struct rem_dirs *arg;
int error;
return error;
}
- /* Thread arg is released at thread before being detached */
- errno = pthread_create(&thread, NULL, remove_from_root, (void *) arg);
- if (errno) {
+ /* Thread arg is released at thread */
+ error = internal_pool_push(remove_from_root, (void *) arg);
+ if (error) {
rem_dirs_destroy(arg);
- return pr_op_errno(errno,
- "Could not spawn the delete dir daemon thread");
+ return error;
}
return 0;
--- /dev/null
+#include "internal_pool.h"
+
+#include <stddef.h>
+
+/*
+ * This is a basic wrapper for thread_pool functions, but the allocated pool
+ * lives at the main thread.
+ *
+ * Additional threads that must be spawned during execution (those that aren't
+ * related to the validation or server thread pool tasks) can be pushed here.
+ */
+
+#define INTERNAL_POOL_MAX 5
+
+struct thread_pool *pool;
+
+int
+internal_pool_init(void)
+{
+ int error;
+
+ pool = NULL;
+ error = thread_pool_create(INTERNAL_POOL_MAX, &pool);
+ if (error)
+ return error;
+
+ return 0;
+}
+
+int
+internal_pool_push(thread_pool_task_cb cb, void *arg)
+{
+ return thread_pool_push(pool, cb, arg);
+}
+
+void
+internal_pool_cleanup(void)
+{
+ thread_pool_destroy(pool);
+}
--- /dev/null
+#ifndef SRC_INTERNAL_POOL_H_
+#define SRC_INTERNAL_POOL_H_
+
+#include "thread/thread_pool.h"
+
+int internal_pool_init(void);
+int internal_pool_push(thread_pool_task_cb, void *);
+void internal_pool_cleanup(void);
+
+#endif /* SRC_INTERNAL_POOL_H_ */
#include "config.h"
#include "debug.h"
#include "extension.h"
+#include "internal_pool.h"
#include "nid.h"
#include "reqs_errors.h"
#include "thread_var.h"
if (error)
goto revert_nid;
- error = relax_ng_init();
+ error = internal_pool_init();
if (error)
goto revert_http;
+ error = relax_ng_init();
+ if (error)
+ goto revert_pool;
+
error = start_rtr_server();
relax_ng_cleanup();
+revert_pool:
+ internal_pool_cleanup();
revert_http:
http_cleanup();
revert_nid:
time_t now;
int error;
- /* FIXME (now) Configure max threads for tals pool */
pool = NULL;
- error = thread_pool_create(10, &pool);
+ error = thread_pool_create(config_get_thread_pool_validation_max(),
+ &pool);
if (error)
return error;
* Reads exactly @buffer_len bytes from @buffer, erroring if this goal cannot be
* met.
*
- * If @allow_end is true, will allow immediate EOF in place of the buffer bytes.
+ * If @allow_eof is true, will allow immediate EOF in place of the buffer bytes.
* (Though all this really means is that the corresponding warning message will
* not be printed, which is perfectly fine as far as the only current caller is
* concerned.)
for (offset = 0; offset < buffer_len; offset += read_result) {
read_result = read(fd, &buffer[offset], buffer_len - offset);
if (read_result == -1)
- return -pr_op_errno(errno, "Client socket read interrupted");
+ return -pr_op_errno(errno,
+ "Client socket read interrupted");
if (read_result == 0) {
if (!allow_eof)
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
-#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include "config.h"
#include "clients.h"
+#include "internal_pool.h"
#include "log.h"
-#include "updates_daemon.h"
+#include "validation_run.h"
#include "rtr/err_pdu.h"
#include "rtr/pdu.h"
#include "rtr/db/vrps.h"
+#include "thread/thread_pool.h"
+
+/* Constant messages regarding a client status */
+#define CL_ACCEPTED "accepted"
+#define CL_CLOSED "closed"
+#define CL_TERMINATED "terminated"
/* Parameters for each thread that handles client connections */
struct thread_param {
int fd;
- pthread_t tid;
struct sockaddr_storage addr;
};
/* List of server sockets */
SLIST_HEAD(server_fds, fd_node);
+/* Does the server needs to be stopped? */
+static volatile bool server_stop;
+
+/* Parameters for the RTR server task */
+struct rtr_task_param {
+ struct server_fds *fds;
+ struct thread_pool *pool;
+};
+
static int
init_addrinfo(char const *hostname, char const *service,
struct addrinfo **result)
}
static void
-server_fd_cleanup(struct server_fds *fds)
+server_fds_destroy(struct server_fds *fds)
{
struct fd_node *fd;
close(fd->id);
free(fd);
}
+ free(fds);
}
static int
error = parse_address(addresses->array[i], default_service,
&address, &service);
if (error)
- goto cleanup_fds;
+ return error;
error = server_fd_add(fds, address, service);
/* Always release them */
free(address);
free(service);
if (error)
- goto cleanup_fds;
+ return error;
}
return 0;
-cleanup_fds:
- server_fd_cleanup(fds);
- return error;
}
enum verdict {
meta->destructor(request->pdu);
}
-static void
-print_close_failure(int error, int fd)
+static int
+print_close_failure(int error, struct sockaddr_storage *sockaddr)
{
- struct sockaddr_storage sockaddr;
char buffer[INET6_ADDRSTRLEN];
char const *addr_str;
- addr_str = (clients_get_addr(fd, &sockaddr) == 0)
- ? sockaddr2str(&sockaddr, buffer)
- : "(unknown)";
+ addr_str = sockaddr2str(sockaddr, buffer);
- pr_op_errno(error, "close() failed on socket of client %s", addr_str);
-}
-
-static void
-end_client(int fd)
-{
- if (close(fd) != 0)
- print_close_failure(errno, fd);
+ return pr_op_errno(error, "close() failed on socket of client %s",
+ addr_str);
}
static void
sockaddr2str(addr, buffer));
}
+static int
+end_client(struct client *client, void *arg)
+{
+ if (arg != NULL && strcmp(arg, CL_TERMINATED) == 0)
+ shutdown(client->fd, SHUT_RDWR);
+
+ if (close(client->fd) != 0)
+ return print_close_failure(errno, &client->addr);
+
+ print_client_addr(&(client->addr), arg, client->fd);
+ return 0;
+}
+
/*
* The client socket threads' entry routine.
* @arg must be released.
memcpy(¶m, arg, sizeof(param));
free(arg);
- error = clients_add(param.fd, param.addr, param.tid);
+ error = clients_add(param.fd, param.addr);
if (error) {
close(param.fd);
return NULL;
break;
}
- print_client_addr(¶m.addr, "closed", param.fd);
- end_client(param.fd);
- clients_forget(param.fd);
-
- /* Release to avoid the wait till the parent tries to join */
- pthread_detach(param.tid);
+ clients_forget(param.fd, end_client, CL_CLOSED);
return NULL;
}
{
struct fd_node *node;
- FD_ZERO (fdset);
+ FD_ZERO(fdset);
SLIST_FOREACH(node, fds, next)
- FD_SET (node->id, fdset);
-
+ FD_SET(node->id, fdset);
}
/*
* Waits for client connections and spawns threads to handle them.
*/
-static int
-handle_client_connections(struct server_fds *fds)
+static void *
+handle_client_connections(void *arg)
{
- struct fd_node *head_node;
- struct sigaction ign;
+ struct rtr_task_param *rtr_param = arg;
+ struct server_fds *fds;
+ struct thread_pool *pool;
struct sockaddr_storage client_addr;
struct thread_param *param;
struct timeval select_time;
int fd;
int error;
- /* Ignore SIGPIPES, they're handled apart */
- ign.sa_handler = SIG_IGN;
- ign.sa_flags = 0;
- sigemptyset(&ign.sa_mask);
- sigaction(SIGPIPE, &ign, NULL);
-
- head_node = SLIST_FIRST(fds);
- last_server_fd = head_node->id;
+ /* Get the argument pointers, and release arg at once */
+ fds = rtr_param->fds;
+ pool = rtr_param->pool;
+ free(rtr_param);
- SLIST_FOREACH(head_node, fds, next) {
- error = listen(head_node->id, config_get_server_queue());
- if (error)
- return pr_op_errno(errno,
- "Couldn't listen on server socket.");
- }
+ last_server_fd = SLIST_FIRST(fds)->id;
sizeof_client_addr = sizeof(client_addr);
+ /* I'm alive! */
+ server_stop = false;
+
pr_op_debug("Waiting for client connections at server...");
do {
/* Look for connections every .2 seconds*/
select_time.tv_sec = 0;
select_time.tv_usec = 200000;
+ /* Am I still alive? */
+ if (server_stop)
+ break;
+
init_fdset(fds, &readfds);
if (select(last_server_fd + 1, &readfds, NULL, NULL,
case VERDICT_RETRY:
continue;
case VERDICT_EXIT:
- return -EINVAL;
+ return NULL;
}
- print_client_addr(&client_addr, "accepted", client_fd);
+ print_client_addr(&client_addr, CL_ACCEPTED, client_fd);
/*
* Note: My gut says that errors from now on (even the
param->fd = client_fd;
param->addr = client_addr;
- error = pthread_create(¶m->tid, NULL,
- client_thread_cb, param);
- if (error && error != EAGAIN)
+ error = thread_pool_push(pool, client_thread_cb,
+ param);
+ if (error) {
+ pr_op_err("Couldn't push a thread to attend incoming RTR client");
/* Error with min RTR version */
err_pdu_send_internal_error(client_fd, RTR_V0);
- if (error) {
- pr_op_errno(error,
- "Could not spawn the client's thread");
close(client_fd);
free(param);
}
}
} while (true);
- return 0; /* Unreachable. */
+ return NULL; /* Unreachable. */
}
-/*
- * Receive @arg to be called as a clients_foreach_cb
- */
static int
-kill_client(struct client *client, void *arg)
+__handle_client_connections(struct server_fds *fds, struct thread_pool *pool)
{
- end_client(client->fd);
- print_client_addr(&(client->addr), "terminated", client->fd);
- /* Don't call clients_forget to avoid deadlock! */
- return 0;
-}
+ struct rtr_task_param *param;
+ struct fd_node *node;
+ struct sigaction ign;
+ int error;
-static void
-end_clients(void)
-{
- clients_foreach(kill_client, NULL);
- /* Let the clients be deleted when clients DB is destroyed */
-}
+ /* Ignore SIGPIPES, they're handled apart */
+ ign.sa_handler = SIG_IGN;
+ ign.sa_flags = 0;
+ sigemptyset(&ign.sa_mask);
+ sigaction(SIGPIPE, &ign, NULL);
+
+ SLIST_FOREACH(node, fds, next) {
+ error = listen(node->id, config_get_server_queue());
+ if (error)
+ return pr_op_errno(errno,
+ "Couldn't listen on server socket.");
+ }
+
+ param = malloc(sizeof(struct rtr_task_param));
+ if (param == NULL)
+ return pr_enomem();
+
+ param->fds = fds;
+ param->pool = pool;
+
+ /* handle_client_connections() must release param */
+ error = internal_pool_push(handle_client_connections, param);
+ if (error) {
+ free(param);
+ return error;
+ }
-static int
-join_thread(pthread_t tid, void *arg)
-{
- close_thread(tid, "Client");
return 0;
}
int
rtr_listen(void)
{
- bool changed;
- struct server_fds fds; /* "file descriptors" */
+ struct server_fds *fds; /* "file descriptors" */
+ struct thread_pool *pool;
int error;
+ server_stop = true;
+
error = clients_db_init();
if (error)
return error;
if (config_get_mode() == STANDALONE) {
- error = vrps_update(&changed);
- if (error)
- pr_op_err("Error %d while trying to update the ROA database.",
- error);
+ error = validation_run_first();
goto revert_clients_db; /* Error 0 it's ok */
}
- SLIST_INIT(&fds);
- error = create_server_sockets(&fds);
- if (error)
+ fds = malloc(sizeof(struct server_fds));
+ if (fds == NULL) {
+ error = pr_enomem();
goto revert_clients_db;
+ }
+
+ SLIST_INIT(fds);
+ error = create_server_sockets(fds);
+ if (error)
+ goto revert_server_fds;
+
+ pool = NULL;
+ error = thread_pool_create(config_get_thread_pool_server_max(), &pool);
+ if (error)
+ goto revert_server_fds;
- error = updates_daemon_start();
+ /* Do the first run */
+ error = validation_run_first();
if (error)
- goto revert_server_sockets;
+ goto revert_thread_pool;
+
+ /* Wait for connections at another thread */
+ error = __handle_client_connections(fds, pool);
+ if (error)
+ goto revert_thread_pool;
+
+ /* Keep running the validations on the main thread */
+ error = validation_run_cycle();
+
+ /* Terminate all clients */
+ clients_terminate_all(end_client, CL_TERMINATED);
- error = handle_client_connections(&fds);
+ /* Stop the server (it lives on a detached thread) */
+ server_stop = true;
- end_clients();
- updates_daemon_destroy();
-revert_server_sockets:
- server_fd_cleanup(&fds);
+revert_thread_pool:
+ thread_pool_destroy(pool);
+revert_server_fds:
+ server_fds_destroy(fds);
revert_clients_db:
- clients_db_destroy(join_thread, NULL);
+ clients_db_destroy();
return error;
}
while (true) {
thread_pool_lock(pool);
- while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop) {
+ while (TAILQ_EMPTY(&(pool->queue)) && !pool->stop)
pthread_cond_wait(&(pool->working_cond), &(pool->lock));
- }
+
if (pool->stop)
break;
- /* Pop the tail */
+ /* Pull the tail */
task = task_queue_pull(&(pool->queue));
pool->working_count++;
pr_op_debug("Working on task #%u", pool->working_count);
if (error)
return pr_op_errno(error, "Calling pthread_attr_init()");
- /* FIXME (now) Let at 2MB? */
- /* 2MB */
+ /* Use 2MB (default in most 64 bits systems) */
error = pthread_attr_setstacksize(&attr, 1024 * 1024 * 2);
if (error)
return pr_op_errno(error,
thread_pool_lock(pool);
task_queue_push(&(pool->queue), task);
+ thread_pool_unlock(pool);
+
/* There's work to do! */
pthread_cond_broadcast(&(pool->working_cond));
- thread_pool_unlock(pool);
return 0;
}
+++ /dev/null
-#ifndef SRC_UPDATES_DAEMON_H_
-#define SRC_UPDATES_DAEMON_H_
-
-int updates_daemon_start(void);
-void updates_daemon_destroy(void);
-
-#endif /* SRC_UPDATES_DAEMON_H_ */
-#include "updates_daemon.h"
+#include "validation_run.h"
-#include <errno.h>
#include <stdbool.h>
#include <unistd.h>
-#include "common.h"
#include "config.h"
#include "log.h"
#include "notify.h"
-#include "object/tal.h"
#include "rtr/db/vrps.h"
-static pthread_t thread;
+/* Runs a single cycle, use at standalone mode or before running RTR server */
+int
+validation_run_first(void)
+{
+ bool upd;
+ int error;
-static void *
-check_vrps_updates(void *param_void)
+ upd = false;
+ error = vrps_update(&upd);
+ if (error)
+ return pr_op_err("First validation wasn't successful.");
+
+ return 0;
+}
+
+/* Run a validation cycle each 'server.interval.validation' secs */
+int
+validation_run_cycle(void)
{
+ unsigned int validation_interval;
bool changed;
int error;
+ validation_interval = config_get_validation_interval();
do {
- sleep(config_get_validation_interval());
+ sleep(validation_interval);
error = vrps_update(&changed);
if (error == -EINTR)
}
} while (true);
- return NULL;
-}
-
-int
-updates_daemon_start(void)
-{
- bool changed;
- int error;
-
- error = vrps_update(&changed);
- if (error)
- return pr_op_err("First validation wasn't successful.");
-
- errno = pthread_create(&thread, NULL, check_vrps_updates, NULL);
- if (errno)
- return -pr_op_errno(errno,
- "Could not spawn the update daemon thread");
-
- return 0;
-}
-
-void
-updates_daemon_destroy(void)
-{
- close_thread(thread, "Validation");
+ return error;
}
--- /dev/null
+#ifndef SRC_VALIDATION_RUN_H_
+#define SRC_VALIDATION_RUN_H_
+
+int validation_run_first(void);
+int validation_run_cycle(void);
+
+#endif /* SRC_VALIDATION_RUN_H_ */
return 0;
}
-static int
-join_threads(pthread_t tid, void *arg)
-{
- /* Empty, since no threads are alive */
- return 0;
-}
-
START_TEST(basic_test)
{
/*
*/
for (i = 0; i < 4; i++) {
- ck_assert_int_eq(0, clients_add(1, addr, 10));
- ck_assert_int_eq(0, clients_add(2, addr, 20));
- ck_assert_int_eq(0, clients_add(3, addr, 30));
- ck_assert_int_eq(0, clients_add(4, addr, 40));
+ ck_assert_int_eq(0, clients_add(1, addr));
+ ck_assert_int_eq(0, clients_add(2, addr));
+ ck_assert_int_eq(0, clients_add(3, addr));
+ ck_assert_int_eq(0, clients_add(4, addr));
}
- clients_forget(3);
+ clients_forget(3, NULL, NULL);
state = 0;
ck_assert_int_eq(0, clients_foreach(handle_foreach, &state));
ck_assert_uint_eq(3, state);
- clients_db_destroy(join_threads, NULL);
+ clients_db_destroy();
}
END_TEST
{
http_priority = value;
}
+
+unsigned int
+config_get_thread_pool_validation_max(void)
+{
+ return 10;
+}
#include "impersonator.c"
#include "thread/thread_pool.c"
-#define TOTAL_THREADS 50
-
static void *
thread_work(void *arg)
{
int *value = arg;
- sleep(2);
(*value) += 2;
return NULL;
}
-START_TEST(tpool_work)
+static void
+test_threads_work(unsigned int total_threads)
{
struct thread_pool *pool;
int **data;
int i;
int error;
- error = thread_pool_create(TOTAL_THREADS, &pool);
+ error = thread_pool_create(total_threads, &pool);
ck_assert_int_eq(error, 0);
/* Just a dummy array where each thread will modify one slot only */
- data = calloc(TOTAL_THREADS, sizeof(int *));
+ data = calloc(total_threads, sizeof(int *));
ck_assert_ptr_ne(data, NULL);
- for (i = 0; i < TOTAL_THREADS; i++) {
+ for (i = 0; i < total_threads; i++) {
data[i] = malloc(sizeof(int));
ck_assert_ptr_ne(data[i], NULL);
*data[i] = 0;
thread_pool_wait(pool);
/* Every element should have been modified */
- for (i = 0; i < TOTAL_THREADS; i++) {
+ for (i = 0; i < total_threads; i++) {
ck_assert_int_eq(*data[i], 2);
free(data[i]);
}
free(data);
thread_pool_destroy(pool);
}
+
+START_TEST(tpool_single_work)
+{
+ test_threads_work(1);
+}
+END_TEST
+
+START_TEST(tpool_multiple_work)
+{
+ test_threads_work(200);
+}
END_TEST
Suite *thread_pool_suite(void)
{
Suite *suite;
- TCase *work;
+ TCase *single, *multiple;
+
+ single = tcase_create("single_work");
+ tcase_add_test(multiple, tpool_single_work);
- work = tcase_create("work");
- tcase_add_test(work, tpool_work);
+ multiple = tcase_create("multiple_work");
+ tcase_add_test(multiple, tpool_multiple_work);
suite = suite_create("thread_pool_test()");
- suite_add_tcase(suite, work);
+ suite_add_tcase(suite, single);
+ suite_add_tcase(suite, multiple);
return suite;
}