]> git.ipfire.org Git - thirdparty/strongswan.git/blame - Source/charon/threads/thread_pool.c
- changed allocation behavior
[thirdparty/strongswan.git] / Source / charon / threads / thread_pool.c
CommitLineData
c93b51ec 1/**
4273d819 2 * @file thread_pool.c
c93b51ec 3 *
ca76df97 4 * @brief Implementation of thread_pool_t.
c93b51ec
MW
5 *
6 */
7
8/*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
c93b51ec 23#include <stdlib.h>
c93b51ec 24#include <pthread.h>
67a57dc6
MW
25#include <string.h>
26#include <errno.h>
c1eb1537 27#include <unistd.h>
b85d20d1
MW
28
29#include "thread_pool.h"
c5a6681c 30
021c2322
MW
31#include <globals.h>
32#include <queues/job_queue.h>
716abc9f
MW
33#include <queues/jobs/delete_ike_sa_job.h>
34#include <queues/jobs/incoming_packet_job.h>
35#include <queues/jobs/initiate_ike_sa_job.h>
021c2322
MW
36#include <utils/allocator.h>
37#include <utils/logger.h>
f529def1 38
5796aa16
MW
39typedef struct private_thread_pool_t private_thread_pool_t;
40
c93b51ec 41/**
ca76df97 42 * @brief Structure with private members for thread_pool_t.
c93b51ec 43 */
5796aa16 44struct private_thread_pool_t {
c93b51ec
MW
45 /**
46 * inclusion of public members
47 */
48 thread_pool_t public;
ca76df97
MW
49
50 /**
51 * @brief Main processing functino for worker threads.
52 *
53 * Gets a job from the job queue and calls corresponding
54 * function for processing.
55 *
56 * @param this private_thread_pool_t-Object
57 */
58 void (*process_jobs) (private_thread_pool_t *this);
59
60 /**
61 * @brief Process a INCOMING_PACKET_JOB.
62 *
63 * @param this private_thread_pool_t-Object
64 */
65 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
66
67 /**
68 * @brief Process a INITIATE_IKE_SA_JOB.
69 *
70 * @param this private_thread_pool_t-Object
71 */
72 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
73
0617722c 74 /**
ca76df97 75 * @brief Process a DELETE_IKE_SA_JOB.
0617722c
MW
76 *
77 * @param this private_thread_pool_t-Object
78 */
ca76df97
MW
79 void (*process_delete_ike_sa_job) (private_thread_pool_t *this, delete_ike_sa_job_t *job);
80
c93b51ec
MW
81 /**
82 * number of running threads
83 */
d048df5c
MW
84 size_t pool_size;
85
c93b51ec
MW
86 /**
87 * array of thread ids
88 */
89 pthread_t *threads;
d048df5c 90
67a57dc6
MW
91 /**
92 * logger of the threadpool
93 */
ca313de4 94 logger_t *pool_logger;
d048df5c 95
ca313de4 96 /**
ca76df97 97 * logger of the worker threads
ca313de4
MW
98 */
99 logger_t *worker_logger;
0617722c 100} ;
c93b51ec
MW
101
102
0617722c
MW
103
104/**
105 * implements private_thread_pool_t.function
106 */
ca76df97 107static void process_jobs(private_thread_pool_t *this)
c93b51ec 108{
db5dc986
MW
109 /* cancellation disabled by default */
110 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
c1eb1537
MW
111
112 this->worker_logger->log(this->worker_logger, CONTROL, "worker thread running, pid: %d", getpid());
c1ca1ee0 113
c93b51ec 114 for (;;) {
4273d819 115 job_t *job;
91443667
JH
116 job_type_t job_type;
117
d048df5c 118 job = global_job_queue->get(global_job_queue);
91443667 119 job_type = job->get_type(job);
ca76df97
MW
120 this->worker_logger->log(this->worker_logger, CONTROL|MORE, "got a job of type %s",
121 mapping_find(job_type_m,job_type));
c93b51ec 122
91443667
JH
123 switch (job_type)
124 {
125 case INCOMING_PACKET:
126 {
ca76df97
MW
127 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
128 break;
129 }
130 case INITIATE_IKE_SA:
131 {
132 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
133 break;
134 }
135 case DELETE_IKE_SA:
136 {
137 this->process_delete_ike_sa_job(this, (delete_ike_sa_job_t*)job);
138 break;
139 }
140 default:
141 {
142 this->worker_logger->log(this->worker_logger, ERROR, "job of type %s not supported!",
143 mapping_find(job_type_m,job_type));
144 break;
145 }
146 }
147 job->destroy(job);
148 }
149}
150
151/**
152 * implementation of private_thread_pool_t.process_incoming_packet_job
153 */
d048df5c 154static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
ca76df97
MW
155{
156 packet_t *packet;
157 message_t *message;
158 ike_sa_t *ike_sa;
159 ike_sa_id_t *ike_sa_id;
160 status_t status;
d048df5c 161
df3c59d0
MW
162
163 packet = job->get_packet(job);
8323a9c1 164
ca76df97 165 message = message_create_from_packet(packet);
563081a3 166
ca76df97
MW
167 status = message->parse_header(message);
168 if (status != SUCCESS)
169 {
170 this->worker_logger->log(this->worker_logger, ERROR, "message header could not be verified!");
171 message->destroy(message);
172 return;
173 }
547172ff 174
ca76df97
MW
175 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "message is a %s %s",
176 mapping_find(exchange_type_m, message->get_exchange_type(message)),
177 message->get_request(message) ? "request" : "reply");
ca313de4 178
ca76df97
MW
179 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
180 (message->get_minor_version(message) != IKE_MINOR_VERSION))
181 {
182 this->worker_logger->log(this->worker_logger, ERROR, "IKE version %d.%d not supported",
183 message->get_major_version(message),
184 message->get_minor_version(message));
185 /* Todo send notify */
186 }
547172ff 187
df3c59d0 188 message->get_ike_sa_id(message, &ike_sa_id);
55f90b5d 189
ca76df97 190 ike_sa_id->switch_initiator(ike_sa_id);
28c734d7 191
ca76df97
MW
192 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking out IKE SA %lld:%lld, role %s",
193 ike_sa_id->get_initiator_spi(ike_sa_id),
194 ike_sa_id->get_responder_spi(ike_sa_id),
195 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
ca313de4 196
ca76df97
MW
197 status = global_ike_sa_manager->checkout(global_ike_sa_manager,ike_sa_id, &ike_sa);
198 if (status != SUCCESS)
199 {
200 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
201 ike_sa_id->destroy(ike_sa_id);
202 message->destroy(message);
203 return;
204 }
563081a3 205
ca76df97
MW
206 status = ike_sa->process_message(ike_sa, message);
207 if (status != SUCCESS)
208 {
209 this->worker_logger->log(this->worker_logger, ERROR, "message could not be processed by IKE SA");
210 }
28c734d7 211
ca76df97
MW
212 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA %lld:%lld, role %s",
213 ike_sa_id->get_initiator_spi(ike_sa_id),
214 ike_sa_id->get_responder_spi(ike_sa_id),
215 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
216 ike_sa_id->destroy(ike_sa_id);
ca313de4 217
ca76df97
MW
218 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
219 if (status != SUCCESS)
220 {
221 this->worker_logger->log(this->worker_logger, ERROR, "checkin of IKE SA failed");
222 }
223 message->destroy(message);
224}
225
226/**
227 * implementation of private_thread_pool_t.process_initiate_ike_sa_job
228 */
d048df5c 229static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
ca76df97
MW
230{
231 /*
232 * Initiatie an IKE_SA:
233 * - is defined by a name of a configuration
234 * - create an empty IKE_SA via manager
235 * - call initiate_connection on this sa
236 */
237 ike_sa_t *ike_sa;
238 status_t status;
d048df5c
MW
239
240
ca76df97 241 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "create and checking out IKE SA");
d048df5c
MW
242
243 global_ike_sa_manager->create_and_checkout(global_ike_sa_manager, &ike_sa);
244
ca76df97
MW
245 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "initializing connection \"%s\"",
246 job->get_configuration_name(job));
247 status = ike_sa->initialize_connection(ike_sa, job->get_configuration_name(job));
248 if (status != SUCCESS)
249 {
250 this->worker_logger->log(this->worker_logger, ERROR, "%s by initialize_conection, job and rejected, IKE_SA deleted.",
251 mapping_find(status_m, status));
252 global_ike_sa_manager->checkin_and_delete(global_ike_sa_manager, ike_sa);
253 return;
254 }
d048df5c 255
ca76df97
MW
256 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "checking in IKE SA");
257 status = global_ike_sa_manager->checkin(global_ike_sa_manager, ike_sa);
258 if (status != SUCCESS)
259 {
260 this->worker_logger->log(this->worker_logger, ERROR, "%s could not checkin IKE_SA.",
261 mapping_find(status_m, status));
c93b51ec 262 }
ca76df97 263}
c1ca1ee0 264
ca76df97
MW
265/**
266 * implementation of private_thread_pool_t.process_delete_ike_sa_job
267 */
d048df5c 268static void process_delete_ike_sa_job(private_thread_pool_t *this, delete_ike_sa_job_t *job)
ca76df97
MW
269{
270 status_t status;
271 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
272
273 this->worker_logger->log(this->worker_logger, CONTROL|MOST, "deleting IKE SA %lld:%lld, role %s",
274 ike_sa_id->get_initiator_spi(ike_sa_id),
275 ike_sa_id->get_responder_spi(ike_sa_id),
276 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
d048df5c 277
ca76df97
MW
278 status = global_ike_sa_manager->delete(global_ike_sa_manager, ike_sa_id);
279 if (status != SUCCESS)
280 {
281 this->worker_logger->log(this->worker_logger, ERROR, "could not delete IKE_SA (%s)",
282 mapping_find(status_m, status));
283 }
c93b51ec
MW
284}
285
ca76df97 286
c93b51ec 287/**
0617722c 288 * implementation of thread_pool_t.get_pool_size
c93b51ec 289 */
0617722c 290static size_t get_pool_size(private_thread_pool_t *this)
c93b51ec 291{
0617722c 292 return this->pool_size;
c93b51ec
MW
293}
294
295/**
296 * Implementation of thread_pool_t.destroy
297 */
d048df5c 298static void destroy(private_thread_pool_t *this)
c93b51ec
MW
299{
300 int current;
c93b51ec 301 /* flag thread for termination */
67a57dc6 302 for (current = 0; current < this->pool_size; current++) {
c1eb1537 303 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker a thread #%d", current+1);
c93b51ec 304 pthread_cancel(this->threads[current]);
4273d819 305 }
c93b51ec
MW
306
307 /* wait for all threads */
308 for (current = 0; current < this->pool_size; current++) {
309 pthread_join(this->threads[current], NULL);
c1eb1537 310 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
c93b51ec
MW
311 }
312
313 /* free mem */
ca313de4
MW
314 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
315 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
c1ca1ee0
JH
316 allocator_free(this->threads);
317 allocator_free(this);
c93b51ec
MW
318}
319
0617722c
MW
320/*
321 * see header
c93b51ec
MW
322 */
323thread_pool_t *thread_pool_create(size_t pool_size)
324{
325 int current;
326
c1ca1ee0 327 private_thread_pool_t *this = allocator_alloc_thing(private_thread_pool_t);
c93b51ec
MW
328
329 /* fill in public fields */
d048df5c 330 this->public.destroy = (void(*)(thread_pool_t*))destroy;
0617722c 331 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
c93b51ec 332
ca76df97
MW
333 this->process_jobs = process_jobs;
334 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
335 this->process_delete_ike_sa_job = process_delete_ike_sa_job;
336 this->process_incoming_packet_job = process_incoming_packet_job;
c93b51ec 337 this->pool_size = pool_size;
0617722c 338
c1ca1ee0 339 this->threads = allocator_alloc(sizeof(pthread_t) * pool_size);
d048df5c 340
ca313de4 341 this->pool_logger = global_logger_manager->create_logger(global_logger_manager,THREAD_POOL,NULL);
d048df5c 342
ca313de4 343 this->worker_logger = global_logger_manager->create_logger(global_logger_manager,WORKER,NULL);
db5dc986 344
c93b51ec 345 /* try to create as many threads as possible, up tu pool_size */
0617722c
MW
346 for (current = 0; current < pool_size; current++)
347 {
ca76df97 348 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
0617722c 349 {
c1eb1537 350 this->pool_logger->log(this->pool_logger, CONTROL, "created worker thread #%d", current+1);
0617722c 351 }
c1eb1537 352 else
67a57dc6 353 {
0617722c 354 /* creation failed, is it the first one? */
67a57dc6
MW
355 if (current == 0)
356 {
c1eb1537 357 this->pool_logger->log(this->pool_logger, ERROR, "could not create any thread");
ca313de4
MW
358 global_logger_manager->destroy_logger(global_logger_manager, this->pool_logger);
359 global_logger_manager->destroy_logger(global_logger_manager, this->worker_logger);
c1ca1ee0
JH
360 allocator_free(this->threads);
361 allocator_free(this);
c93b51ec
MW
362 return NULL;
363 }
364 /* not all threads could be created, but at least one :-/ */
c1eb1537 365 this->pool_logger->log(this->pool_logger, ERROR, "could only create %d from requested %d threads!", current, pool_size);
67a57dc6 366
c93b51ec
MW
367 this->pool_size = current;
368 return (thread_pool_t*)this;
369 }
370 }
c93b51ec
MW
371 return (thread_pool_t*)this;
372}