]> git.ipfire.org Git - thirdparty/strongswan.git/blob - Source/charon/threads/thread_pool.c
- renamed get_block_size of hasher
[thirdparty/strongswan.git] / Source / charon / threads / thread_pool.c
1 /**
2 * @file thread_pool.c
3 *
4 * @brief Implementation of thread_pool_t.
5 *
6 */
7
8 /*
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
11 *
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
16 *
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 */
22
23 #include <stdlib.h>
24 #include <pthread.h>
25 #include <string.h>
26 #include <errno.h>
27
28 #include "thread_pool.h"
29
30 #include <daemon.h>
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_half_open_ike_sa_job.h>
33 #include <queues/jobs/delete_established_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <encoding/payloads/notify_payload.h>
38 #include <utils/logger.h>
39
40
41 typedef struct private_thread_pool_t private_thread_pool_t;
42
43 /**
44 * @brief Private data of thread_pool_t class.
45 */
46 struct private_thread_pool_t {
47 /**
48 * Public thread_pool_t interface.
49 */
50 thread_pool_t public;
51
52 /**
53 * @brief Main processing function for worker threads.
54 *
55 * Gets a job from the job queue and calls corresponding
56 * function for processing.
57 *
58 * @param this calling object
59 */
60 void (*process_jobs) (private_thread_pool_t *this);
61
62 /**
63 * @brief Process a INCOMING_PACKET job.
64 *
65 * @param this calling object
66 * @param job incoming_packet_job_t object
67 */
68 void (*process_incoming_packet_job) (private_thread_pool_t *this, incoming_packet_job_t *job);
69
70 /**
71 * @brief Process a INITIATE_IKE_SA job.
72 *
73 * @param this calling object
74 * @param job initiate_ike_sa_job_t object
75 */
76 void (*process_initiate_ike_sa_job) (private_thread_pool_t *this, initiate_ike_sa_job_t *job);
77
78 /**
79 * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
80 *
81 * @param this calling object
82 * @param job delete__half_open_ike_sa_job_t object
83 */
84 void (*process_delete_half_open_ike_sa_job) (private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job);
85
86 /**
87 * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
88 *
89 * @param this calling object
90 * @param job delete_established_ike_sa_job_t object
91 */
92 void (*process_delete_established_ike_sa_job) (private_thread_pool_t *this, delete_established_ike_sa_job_t *job);
93
94 /**
95 * @brief Process a RETRANSMIT_REQUEST job.
96 *
97 * @param this calling object
98 * @param job retransmit_request_job_t object
99 */
100 void (*process_retransmit_request_job) (private_thread_pool_t *this, retransmit_request_job_t *job);
101
102 /**
103 * Creates a job of type DELETE_HALF_OPEN_IKE_SA.
104 *
105 * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT,
106 * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED.
107 *
108 * @param ike_sa_id ID of IKE_SA to delete
109 * @param delay Delay in ms after a half open IKE_SA gets deleted!
110 */
111 void (*create_delete_half_open_ike_sa_job) (private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay);
112
113 /**
114 * Number of running threads.
115 */
116 size_t pool_size;
117
118 /**
119 * Array of thread ids.
120 */
121 pthread_t *threads;
122
123 /**
124 * Logger of the thread pool.
125 */
126 logger_t *pool_logger;
127
128 /**
129 * Logger of the worker threads.
130 */
131 logger_t *worker_logger;
132 } ;
133
134 /**
135 * Implementation of private_thread_pool_t.process_jobs.
136 */
137 static void process_jobs(private_thread_pool_t *this)
138 {
139 job_t *job;
140 job_type_t job_type;
141 timeval_t start_time;
142 timeval_t end_time;
143
144 /* cancellation disabled by default */
145 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
146
147 this->worker_logger->log(this->worker_logger, CONTROL, "Worker thread running, thread_id: %u", (int)pthread_self());
148
149 for (;;) {
150
151 job = charon->job_queue->get(charon->job_queue);
152 job_type = job->get_type(job);
153 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Process job of type %s",
154 mapping_find(job_type_m,job_type));
155 gettimeofday(&start_time,NULL);
156 switch (job_type)
157 {
158 case INCOMING_PACKET:
159 {
160 this->process_incoming_packet_job(this, (incoming_packet_job_t*)job);
161 job->destroy(job);
162 break;
163 }
164 case INITIATE_IKE_SA:
165 {
166 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t*)job);
167 job->destroy(job);
168 break;
169 }
170 case DELETE_HALF_OPEN_IKE_SA:
171 {
172 this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t*)job);
173 job->destroy(job);
174 break;
175 }
176 case DELETE_ESTABLISHED_IKE_SA:
177 {
178 this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t*)job);
179 job->destroy(job);
180 break;
181 }
182 case RETRANSMIT_REQUEST:
183 {
184 this->process_retransmit_request_job(this, (retransmit_request_job_t*)job);
185 break;
186 }
187 default:
188 {
189 this->worker_logger->log(this->worker_logger, ERROR, "Job of type %s not supported!",
190 mapping_find(job_type_m,job_type));
191 job->destroy(job);
192 break;
193 }
194 }
195 gettimeofday(&end_time,NULL);
196
197 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Processed job of type %s in %d us",
198 mapping_find(job_type_m,job_type),
199 (((end_time.tv_sec - start_time.tv_sec) * 1000000) + (end_time.tv_usec - start_time.tv_usec)));
200
201
202 }
203 }
204
205 /**
206 * Implementation of private_thread_pool_t.process_incoming_packet_job.
207 */
208 static void process_incoming_packet_job(private_thread_pool_t *this, incoming_packet_job_t *job)
209 {
210 packet_t *packet;
211 message_t *message;
212 ike_sa_t *ike_sa;
213 ike_sa_id_t *ike_sa_id;
214 status_t status;
215
216
217 packet = job->get_packet(job);
218
219 message = message_create_from_packet(packet);
220
221 status = message->parse_header(message);
222 if (status != SUCCESS)
223 {
224 this->worker_logger->log(this->worker_logger, ERROR, "Message header could not be verified!");
225 message->destroy(message);
226 return;
227 }
228
229 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Message is a %s %s",
230 mapping_find(exchange_type_m, message->get_exchange_type(message)),
231 message->get_request(message) ? "request" : "reply");
232
233 if ((message->get_major_version(message) != IKE_MAJOR_VERSION) ||
234 (message->get_minor_version(message) != IKE_MINOR_VERSION))
235 {
236 this->worker_logger->log(this->worker_logger, ERROR | LEVEL2, "IKE version %d.%d not supported",
237 message->get_major_version(message),
238 message->get_minor_version(message));
239 /*
240 * This check is not handled in state_t object of IKE_SA to increase speed.
241 */
242 if ((message->get_exchange_type(message) == IKE_SA_INIT) && (message->get_request(message)))
243 {
244 message_t *response;
245 message->get_ike_sa_id(message, &ike_sa_id);
246 ike_sa_id->switch_initiator(ike_sa_id);
247 response = message_create_notify_reply(message->get_destination(message),
248 message->get_source(message),
249 IKE_SA_INIT,
250 FALSE,ike_sa_id,INVALID_MAJOR_VERSION);
251 message->destroy(message);
252 ike_sa_id->destroy(ike_sa_id);
253 status = response->generate(response, NULL, NULL, &packet);
254 if (status != SUCCESS)
255 {
256 this->worker_logger->log(this->worker_logger, ERROR, "Could not generate packet from message");
257 response->destroy(response);
258 return;
259 }
260 this->worker_logger->log(this->worker_logger, ERROR, "Send notify reply of type INVALID_MAJOR_VERSION");
261 charon->send_queue->add(charon->send_queue, packet);
262 response->destroy(response);
263 return;
264 }
265 message->destroy(message);
266 return;
267 }
268
269 message->get_ike_sa_id(message, &ike_sa_id);
270
271 ike_sa_id->switch_initiator(ike_sa_id);
272
273 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Checking out IKE SA %lld:%lld, role %s",
274 ike_sa_id->get_initiator_spi(ike_sa_id),
275 ike_sa_id->get_responder_spi(ike_sa_id),
276 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
277
278 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
279 if ((status != SUCCESS) && (status != CREATED))
280 {
281 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out");
282 ike_sa_id->destroy(ike_sa_id);
283 message->destroy(message);
284
285 /*
286 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found ?
287 */
288
289 return;
290 }
291
292 if (status == CREATED)
293 {
294 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3,
295 "Create Job to delete half open IKE_SA.");
296 this->create_delete_half_open_ike_sa_job(this,ike_sa_id,
297 charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
298 }
299
300 status = ike_sa->process_message(ike_sa, message);
301
302 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "%s IKE SA %lld:%lld, role %s",
303 (status == DELETE_ME) ? "Checkin and delete" : "Checkin",
304 ike_sa_id->get_initiator_spi(ike_sa_id),
305 ike_sa_id->get_responder_spi(ike_sa_id),
306 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
307 ike_sa_id->destroy(ike_sa_id);
308
309 if (status == DELETE_ME)
310 {
311 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
312 }
313 else
314 {
315 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
316 }
317
318 if (status != SUCCESS)
319 {
320 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
321 }
322 message->destroy(message);
323 }
324
325 /**
326 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
327 */
328 static void process_initiate_ike_sa_job(private_thread_pool_t *this, initiate_ike_sa_job_t *job)
329 {
330 /*
331 * Initiatie an IKE_SA:
332 * - is defined by a name of a configuration
333 * - create an empty IKE_SA via manager
334 * - call initiate_connection on this sa
335 */
336 ike_sa_t *ike_sa;
337 status_t status;
338
339
340 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Creating and checking out IKE SA");
341 charon->ike_sa_manager->create_and_checkout(charon->ike_sa_manager, &ike_sa);
342
343 status = ike_sa->initiate_connection(ike_sa, job->get_connection(job));
344 if (status != SUCCESS)
345 {
346 this->worker_logger->log(this->worker_logger, ERROR, "Initiation returned %s, going to delete IKE_SA.",
347 mapping_find(status_m, status));
348 charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
349 return;
350 }
351
352 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL3, "Create Job to delete half open IKE_SA.");
353 this->create_delete_half_open_ike_sa_job(this,ike_sa->get_id(ike_sa),
354 charon->configuration->get_half_open_ike_sa_timeout(charon->configuration));
355
356 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking in IKE SA");
357 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
358 if (status != SUCCESS)
359 {
360 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin IKE_SA (%s)",
361 mapping_find(status_m, status));
362 }
363 }
364
365 /**
366 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
367 */
368 static void process_delete_half_open_ike_sa_job(private_thread_pool_t *this, delete_half_open_ike_sa_job_t *job)
369 {
370 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
371 ike_sa_t *ike_sa;
372 status_t status;
373 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
374 if ((status != SUCCESS) && (status != CREATED))
375 {
376 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
377 return;
378 }
379
380
381 switch (ike_sa->get_state(ike_sa))
382 {
383 case INITIATOR_INIT:
384 case RESPONDER_INIT:
385 case IKE_SA_INIT_REQUESTED:
386 case IKE_SA_INIT_RESPONDED:
387 case IKE_AUTH_REQUESTED:
388 {
389 /* IKE_SA is half open and gets deleted! */
390 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
391 if (status != SUCCESS)
392 {
393 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
394 }
395 break;
396 }
397 default:
398 {
399 /* IKE_SA is established and so is not getting deleted! */
400 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
401 if (status != SUCCESS)
402 {
403 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin a checked out IKE_SA!");
404 }
405 break;
406 }
407 }
408 }
409
410 /**
411 * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job.
412 */
413 static void process_delete_established_ike_sa_job(private_thread_pool_t *this, delete_established_ike_sa_job_t *job)
414 {
415 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
416 ike_sa_t *ike_sa;
417 status_t status;
418 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
419 if ((status != SUCCESS) && (status != CREATED))
420 {
421 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
422 return;
423 }
424
425 switch (ike_sa->get_state(ike_sa))
426 {
427 case INITIATOR_INIT:
428 case RESPONDER_INIT:
429 case IKE_SA_INIT_REQUESTED:
430 case IKE_SA_INIT_RESPONDED:
431 case IKE_AUTH_REQUESTED:
432 {
433 break;
434 }
435 default:
436 {
437 this->worker_logger->log(this->worker_logger, CONTROL, "Send delete request for IKE_SA.");
438 ike_sa->send_delete_ike_sa_request(ike_sa);
439 break;
440 }
441 }
442 this->worker_logger->log(this->worker_logger, CONTROL, "Delete established IKE_SA.");
443 status = charon->ike_sa_manager->checkin_and_delete(charon->ike_sa_manager, ike_sa);
444 if (status != SUCCESS)
445 {
446 this->worker_logger->log(this->worker_logger, ERROR, "Could not checkin and delete checked out IKE_SA!");
447 }
448 }
449
450
451 /**
452 * Implementation of private_thread_pool_t.process_retransmit_request_job.
453 */
454 static void process_retransmit_request_job(private_thread_pool_t *this, retransmit_request_job_t *job)
455 {
456
457 ike_sa_id_t *ike_sa_id = job->get_ike_sa_id(job);
458 u_int32_t message_id = job->get_message_id(job);
459 bool stop_retransmitting = FALSE;
460 u_int32_t timeout;
461 ike_sa_t *ike_sa;
462 status_t status;
463
464 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checking out IKE SA %lld:%lld, role %s",
465 ike_sa_id->get_initiator_spi(ike_sa_id),
466 ike_sa_id->get_responder_spi(ike_sa_id),
467 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
468
469 status = charon->ike_sa_manager->checkout(charon->ike_sa_manager,ike_sa_id, &ike_sa);
470 if ((status != SUCCESS) && (status != CREATED))
471 {
472 job->destroy(job);
473 this->worker_logger->log(this->worker_logger, ERROR, "IKE SA could not be checked out. Allready deleted?");
474 return;
475 }
476
477 status = ike_sa->retransmit_request(ike_sa, message_id);
478
479 if (status != SUCCESS)
480 {
481 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL3, "Message doesn't have to be retransmitted");
482 stop_retransmitting = TRUE;
483 }
484
485 this->worker_logger->log(this->worker_logger, CONTROL|LEVEL2, "Checkin IKE SA %lld:%lld, role %s",
486 ike_sa_id->get_initiator_spi(ike_sa_id),
487 ike_sa_id->get_responder_spi(ike_sa_id),
488 ike_sa_id->is_initiator(ike_sa_id) ? "initiator" : "responder");
489
490 status = charon->ike_sa_manager->checkin(charon->ike_sa_manager, ike_sa);
491 if (status != SUCCESS)
492 {
493 this->worker_logger->log(this->worker_logger, ERROR, "Checkin of IKE SA failed!");
494 }
495
496 if (stop_retransmitting)
497 {
498 job->destroy(job);
499 return;
500 }
501
502 job->increase_retransmit_count(job);
503 status = charon->configuration->get_retransmit_timeout (charon->configuration,job->get_retransmit_count(job),&timeout);
504 if (status != SUCCESS)
505 {
506 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Message will not be anymore retransmitted");
507 job->destroy(job);
508 /*
509 * TODO delete IKE_SA ?
510 */
511 return;
512 }
513 charon->event_queue->add_relative(charon->event_queue,(job_t *) job,timeout);
514 }
515
516
517
518 /**
519 * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job.
520 */
521 static void create_delete_half_open_ike_sa_job(private_thread_pool_t *this,ike_sa_id_t *ike_sa_id, u_int32_t delay)
522 {
523 job_t *delete_job;
524
525 this->worker_logger->log(this->worker_logger, CONTROL | LEVEL2, "Going to create job to delete half open IKE_SA in %d ms", delay);
526
527 delete_job = (job_t *) delete_half_open_ike_sa_job_create(ike_sa_id);
528 charon->event_queue->add_relative(charon->event_queue,delete_job, delay);
529 }
530
531
532 /**
533 * Implementation of thread_pool_t.get_pool_size.
534 */
535 static size_t get_pool_size(private_thread_pool_t *this)
536 {
537 return this->pool_size;
538 }
539
540 /**
541 * Implementation of thread_pool_t.destroy.
542 */
543 static void destroy(private_thread_pool_t *this)
544 {
545 int current;
546 /* flag thread for termination */
547 for (current = 0; current < this->pool_size; current++) {
548 this->pool_logger->log(this->pool_logger, CONTROL, "cancelling worker thread #%d", current+1);
549 pthread_cancel(this->threads[current]);
550 }
551
552 /* wait for all threads */
553 for (current = 0; current < this->pool_size; current++) {
554 if (pthread_join(this->threads[current], NULL) == 0)
555 {
556 this->pool_logger->log(this->pool_logger, CONTROL, "worker thread #%d terminated", current+1);
557 }
558 else
559 {
560 this->pool_logger->log(this->pool_logger, ERROR, "could not terminate worker thread #%d", current+1);
561 }
562 }
563
564 /* free mem */
565 free(this->threads);
566 free(this);
567 }
568
569 /*
570 * Described in header.
571 */
572 thread_pool_t *thread_pool_create(size_t pool_size)
573 {
574 int current;
575
576 private_thread_pool_t *this = malloc_thing(private_thread_pool_t);
577
578 /* fill in public fields */
579 this->public.destroy = (void(*)(thread_pool_t*))destroy;
580 this->public.get_pool_size = (size_t(*)(thread_pool_t*))get_pool_size;
581
582 this->process_jobs = process_jobs;
583 this->process_initiate_ike_sa_job = process_initiate_ike_sa_job;
584 this->process_delete_half_open_ike_sa_job = process_delete_half_open_ike_sa_job;
585 this->process_delete_established_ike_sa_job = process_delete_established_ike_sa_job;
586 this->process_incoming_packet_job = process_incoming_packet_job;
587 this->process_retransmit_request_job = process_retransmit_request_job;
588 this->create_delete_half_open_ike_sa_job = create_delete_half_open_ike_sa_job;
589
590 this->pool_size = pool_size;
591
592 this->threads = malloc(sizeof(pthread_t) * pool_size);
593
594 this->pool_logger = logger_manager->get_logger(logger_manager, THREAD_POOL);
595
596 this->worker_logger = logger_manager->get_logger(logger_manager, WORKER);
597
598 /* try to create as many threads as possible, up tu pool_size */
599 for (current = 0; current < pool_size; current++)
600 {
601 if (pthread_create(&(this->threads[current]), NULL, (void*(*)(void*))this->process_jobs, this) == 0)
602 {
603 this->pool_logger->log(this->pool_logger, CONTROL, "Created worker thread #%d", current+1);
604 }
605 else
606 {
607 /* creation failed, is it the first one? */
608 if (current == 0)
609 {
610 this->pool_logger->log(this->pool_logger, ERROR, "Could not create any thread");
611 free(this->threads);
612 free(this);
613 return NULL;
614 }
615 /* not all threads could be created, but at least one :-/ */
616 this->pool_logger->log(this->pool_logger, ERROR, "Could only create %d from requested %d threads!", current, pool_size);
617
618 this->pool_size = current;
619 return (thread_pool_t*)this;
620 }
621 }
622 return (thread_pool_t*)this;
623 }