4 * @brief Implementation of thread_pool_t.
9 * Copyright (C) 2005 Jan Hutter, Martin Willi
10 * Hochschule fuer Technik Rapperswil
12 * This program is free software; you can redistribute it and/or modify it
13 * under the terms of the GNU General Public License as published by the
14 * Free Software Foundation; either version 2 of the License, or (at your
15 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
17 * This program is distributed in the hope that it will be useful, but
18 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
19 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
28 #include "thread_pool.h"
31 #include <queues/job_queue.h>
32 #include <queues/jobs/delete_half_open_ike_sa_job.h>
33 #include <queues/jobs/delete_established_ike_sa_job.h>
34 #include <queues/jobs/incoming_packet_job.h>
35 #include <queues/jobs/initiate_ike_sa_job.h>
36 #include <queues/jobs/retransmit_request_job.h>
37 #include <encoding/payloads/notify_payload.h>
38 #include <utils/logger.h>
41 typedef struct private_thread_pool_t private_thread_pool_t
;
44 * @brief Private data of thread_pool_t class.
46 struct private_thread_pool_t
{
48 * Public thread_pool_t interface.
53 * @brief Main processing function for worker threads.
55 * Gets a job from the job queue and calls corresponding
56 * function for processing.
58 * @param this calling object
60 void (*process_jobs
) (private_thread_pool_t
*this);
63 * @brief Process a INCOMING_PACKET job.
65 * @param this calling object
66 * @param job incoming_packet_job_t object
68 void (*process_incoming_packet_job
) (private_thread_pool_t
*this, incoming_packet_job_t
*job
);
71 * @brief Process a INITIATE_IKE_SA job.
73 * @param this calling object
74 * @param job initiate_ike_sa_job_t object
76 void (*process_initiate_ike_sa_job
) (private_thread_pool_t
*this, initiate_ike_sa_job_t
*job
);
79 * @brief Process a DELETE_HALF_OPEN_IKE_SA job.
81 * @param this calling object
82 * @param job delete__half_open_ike_sa_job_t object
84 void (*process_delete_half_open_ike_sa_job
) (private_thread_pool_t
*this, delete_half_open_ike_sa_job_t
*job
);
87 * @brief Process a DELETE_ESTABLISHED_IKE_SA job.
89 * @param this calling object
90 * @param job delete_established_ike_sa_job_t object
92 void (*process_delete_established_ike_sa_job
) (private_thread_pool_t
*this, delete_established_ike_sa_job_t
*job
);
95 * @brief Process a RETRANSMIT_REQUEST job.
97 * @param this calling object
98 * @param job retransmit_request_job_t object
100 void (*process_retransmit_request_job
) (private_thread_pool_t
*this, retransmit_request_job_t
*job
);
103 * Creates a job of type DELETE_HALF_OPEN_IKE_SA.
105 * This job is used to delete IKE_SA's which are still in state INITIATOR_INIT,
106 * RESPONDER_INIT, IKE_AUTH_REQUESTED, IKE_INIT_REQUESTED or IKE_INIT_RESPONDED.
108 * @param ike_sa_id ID of IKE_SA to delete
109 * @param delay Delay in ms after a half open IKE_SA gets deleted!
111 void (*create_delete_half_open_ike_sa_job
) (private_thread_pool_t
*this,ike_sa_id_t
*ike_sa_id
, u_int32_t delay
);
114 * Number of running threads.
119 * Array of thread ids.
124 * Logger of the thread pool.
126 logger_t
*pool_logger
;
129 * Logger of the worker threads.
131 logger_t
*worker_logger
;
135 * Implementation of private_thread_pool_t.process_jobs.
137 static void process_jobs(private_thread_pool_t
*this)
141 timeval_t start_time
;
144 /* cancellation disabled by default */
145 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, NULL
);
147 this->worker_logger
->log(this->worker_logger
, CONTROL
, "Worker thread running, thread_id: %u", (int)pthread_self());
151 job
= charon
->job_queue
->get(charon
->job_queue
);
152 job_type
= job
->get_type(job
);
153 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL2
, "Process job of type %s",
154 mapping_find(job_type_m
,job_type
));
155 gettimeofday(&start_time
,NULL
);
158 case INCOMING_PACKET
:
160 this->process_incoming_packet_job(this, (incoming_packet_job_t
*)job
);
164 case INITIATE_IKE_SA
:
166 this->process_initiate_ike_sa_job(this, (initiate_ike_sa_job_t
*)job
);
170 case DELETE_HALF_OPEN_IKE_SA
:
172 this->process_delete_half_open_ike_sa_job(this, (delete_half_open_ike_sa_job_t
*)job
);
176 case DELETE_ESTABLISHED_IKE_SA
:
178 this->process_delete_established_ike_sa_job(this, (delete_established_ike_sa_job_t
*)job
);
182 case RETRANSMIT_REQUEST
:
184 this->process_retransmit_request_job(this, (retransmit_request_job_t
*)job
);
189 this->worker_logger
->log(this->worker_logger
, ERROR
, "Job of type %s not supported!",
190 mapping_find(job_type_m
,job_type
));
195 gettimeofday(&end_time
,NULL
);
197 this->worker_logger
->log(this->worker_logger
, CONTROL
| LEVEL2
, "Processed job of type %s in %d us",
198 mapping_find(job_type_m
,job_type
),
199 (((end_time
.tv_sec
- start_time
.tv_sec
) * 1000000) + (end_time
.tv_usec
- start_time
.tv_usec
)));
206 * Implementation of private_thread_pool_t.process_incoming_packet_job.
208 static void process_incoming_packet_job(private_thread_pool_t
*this, incoming_packet_job_t
*job
)
213 ike_sa_id_t
*ike_sa_id
;
217 packet
= job
->get_packet(job
);
219 message
= message_create_from_packet(packet
);
221 status
= message
->parse_header(message
);
222 if (status
!= SUCCESS
)
224 this->worker_logger
->log(this->worker_logger
, ERROR
, "Message header could not be verified!");
225 message
->destroy(message
);
229 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL2
, "Message is a %s %s",
230 mapping_find(exchange_type_m
, message
->get_exchange_type(message
)),
231 message
->get_request(message
) ? "request" : "reply");
233 if ((message
->get_major_version(message
) != IKE_MAJOR_VERSION
) ||
234 (message
->get_minor_version(message
) != IKE_MINOR_VERSION
))
236 this->worker_logger
->log(this->worker_logger
, ERROR
| LEVEL2
, "IKE version %d.%d not supported",
237 message
->get_major_version(message
),
238 message
->get_minor_version(message
));
240 * This check is not handled in state_t object of IKE_SA to increase speed.
242 if ((message
->get_exchange_type(message
) == IKE_SA_INIT
) && (message
->get_request(message
)))
245 message
->get_ike_sa_id(message
, &ike_sa_id
);
246 ike_sa_id
->switch_initiator(ike_sa_id
);
247 response
= message_create_notify_reply(message
->get_destination(message
),
248 message
->get_source(message
),
250 FALSE
,ike_sa_id
,INVALID_MAJOR_VERSION
);
251 message
->destroy(message
);
252 ike_sa_id
->destroy(ike_sa_id
);
253 status
= response
->generate(response
, NULL
, NULL
, &packet
);
254 if (status
!= SUCCESS
)
256 this->worker_logger
->log(this->worker_logger
, ERROR
, "Could not generate packet from message");
257 response
->destroy(response
);
260 this->worker_logger
->log(this->worker_logger
, ERROR
, "Send notify reply of type INVALID_MAJOR_VERSION");
261 charon
->send_queue
->add(charon
->send_queue
, packet
);
262 response
->destroy(response
);
265 message
->destroy(message
);
269 message
->get_ike_sa_id(message
, &ike_sa_id
);
271 ike_sa_id
->switch_initiator(ike_sa_id
);
273 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL3
, "Checking out IKE SA %lld:%lld, role %s",
274 ike_sa_id
->get_initiator_spi(ike_sa_id
),
275 ike_sa_id
->get_responder_spi(ike_sa_id
),
276 ike_sa_id
->is_initiator(ike_sa_id
) ? "initiator" : "responder");
278 status
= charon
->ike_sa_manager
->checkout(charon
->ike_sa_manager
,ike_sa_id
, &ike_sa
);
279 if ((status
!= SUCCESS
) && (status
!= CREATED
))
281 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE SA could not be checked out");
282 ike_sa_id
->destroy(ike_sa_id
);
283 message
->destroy(message
);
286 * TODO send notify reply of type INVALID_IKE_SPI if SPI could not be found ?
292 if (status
== CREATED
)
294 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL3
,
295 "Create Job to delete half open IKE_SA.");
296 this->create_delete_half_open_ike_sa_job(this,ike_sa_id
,
297 charon
->configuration
->get_half_open_ike_sa_timeout(charon
->configuration
));
300 status
= ike_sa
->process_message(ike_sa
, message
);
302 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL3
, "%s IKE SA %lld:%lld, role %s",
303 (status
== DELETE_ME
) ? "Checkin and delete" : "Checkin",
304 ike_sa_id
->get_initiator_spi(ike_sa_id
),
305 ike_sa_id
->get_responder_spi(ike_sa_id
),
306 ike_sa_id
->is_initiator(ike_sa_id
) ? "initiator" : "responder");
307 ike_sa_id
->destroy(ike_sa_id
);
309 if (status
== DELETE_ME
)
311 status
= charon
->ike_sa_manager
->checkin_and_delete(charon
->ike_sa_manager
, ike_sa
);
315 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
318 if (status
!= SUCCESS
)
320 this->worker_logger
->log(this->worker_logger
, ERROR
, "Checkin of IKE SA failed!");
322 message
->destroy(message
);
326 * Implementation of private_thread_pool_t.process_initiate_ike_sa_job.
328 static void process_initiate_ike_sa_job(private_thread_pool_t
*this, initiate_ike_sa_job_t
*job
)
331 * Initiatie an IKE_SA:
332 * - is defined by a name of a configuration
333 * - create an empty IKE_SA via manager
334 * - call initiate_connection on this sa
340 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL2
, "Creating and checking out IKE SA");
341 charon
->ike_sa_manager
->create_and_checkout(charon
->ike_sa_manager
, &ike_sa
);
343 status
= ike_sa
->initiate_connection(ike_sa
, job
->get_connection(job
));
344 if (status
!= SUCCESS
)
346 this->worker_logger
->log(this->worker_logger
, ERROR
, "Initiation returned %s, going to delete IKE_SA.",
347 mapping_find(status_m
, status
));
348 charon
->ike_sa_manager
->checkin_and_delete(charon
->ike_sa_manager
, ike_sa
);
352 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL3
, "Create Job to delete half open IKE_SA.");
353 this->create_delete_half_open_ike_sa_job(this,ike_sa
->get_id(ike_sa
),
354 charon
->configuration
->get_half_open_ike_sa_timeout(charon
->configuration
));
356 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL2
, "Checking in IKE SA");
357 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
358 if (status
!= SUCCESS
)
360 this->worker_logger
->log(this->worker_logger
, ERROR
, "Could not checkin IKE_SA (%s)",
361 mapping_find(status_m
, status
));
366 * Implementation of private_thread_pool_t.process_delete_ike_sa_job.
368 static void process_delete_half_open_ike_sa_job(private_thread_pool_t
*this, delete_half_open_ike_sa_job_t
*job
)
370 ike_sa_id_t
*ike_sa_id
= job
->get_ike_sa_id(job
);
373 status
= charon
->ike_sa_manager
->checkout(charon
->ike_sa_manager
,ike_sa_id
, &ike_sa
);
374 if ((status
!= SUCCESS
) && (status
!= CREATED
))
376 this->worker_logger
->log(this->worker_logger
, CONTROL
| LEVEL3
, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
381 switch (ike_sa
->get_state(ike_sa
))
385 case IKE_SA_INIT_REQUESTED
:
386 case IKE_SA_INIT_RESPONDED
:
387 case IKE_AUTH_REQUESTED
:
389 /* IKE_SA is half open and gets deleted! */
390 status
= charon
->ike_sa_manager
->checkin_and_delete(charon
->ike_sa_manager
, ike_sa
);
391 if (status
!= SUCCESS
)
393 this->worker_logger
->log(this->worker_logger
, ERROR
, "Could not checkin and delete checked out IKE_SA!");
399 /* IKE_SA is established and so is not getting deleted! */
400 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
401 if (status
!= SUCCESS
)
403 this->worker_logger
->log(this->worker_logger
, ERROR
, "Could not checkin a checked out IKE_SA!");
411 * Implementation of private_thread_pool_t.process_delete_established_ike_sa_job.
413 static void process_delete_established_ike_sa_job(private_thread_pool_t
*this, delete_established_ike_sa_job_t
*job
)
415 ike_sa_id_t
*ike_sa_id
= job
->get_ike_sa_id(job
);
418 status
= charon
->ike_sa_manager
->checkout(charon
->ike_sa_manager
,ike_sa_id
, &ike_sa
);
419 if ((status
!= SUCCESS
) && (status
!= CREATED
))
421 this->worker_logger
->log(this->worker_logger
, CONTROL
| LEVEL3
, "IKE SA seems to be allready deleted and so doesn't have to be deleted");
425 switch (ike_sa
->get_state(ike_sa
))
429 case IKE_SA_INIT_REQUESTED
:
430 case IKE_SA_INIT_RESPONDED
:
431 case IKE_AUTH_REQUESTED
:
437 this->worker_logger
->log(this->worker_logger
, CONTROL
, "Send delete request for IKE_SA.");
438 ike_sa
->send_delete_ike_sa_request(ike_sa
);
442 this->worker_logger
->log(this->worker_logger
, CONTROL
, "Delete established IKE_SA.");
443 status
= charon
->ike_sa_manager
->checkin_and_delete(charon
->ike_sa_manager
, ike_sa
);
444 if (status
!= SUCCESS
)
446 this->worker_logger
->log(this->worker_logger
, ERROR
, "Could not checkin and delete checked out IKE_SA!");
452 * Implementation of private_thread_pool_t.process_retransmit_request_job.
454 static void process_retransmit_request_job(private_thread_pool_t
*this, retransmit_request_job_t
*job
)
457 ike_sa_id_t
*ike_sa_id
= job
->get_ike_sa_id(job
);
458 u_int32_t message_id
= job
->get_message_id(job
);
459 bool stop_retransmitting
= FALSE
;
464 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL2
, "Checking out IKE SA %lld:%lld, role %s",
465 ike_sa_id
->get_initiator_spi(ike_sa_id
),
466 ike_sa_id
->get_responder_spi(ike_sa_id
),
467 ike_sa_id
->is_initiator(ike_sa_id
) ? "initiator" : "responder");
469 status
= charon
->ike_sa_manager
->checkout(charon
->ike_sa_manager
,ike_sa_id
, &ike_sa
);
470 if ((status
!= SUCCESS
) && (status
!= CREATED
))
473 this->worker_logger
->log(this->worker_logger
, ERROR
, "IKE SA could not be checked out. Allready deleted?");
477 status
= ike_sa
->retransmit_request(ike_sa
, message_id
);
479 if (status
!= SUCCESS
)
481 this->worker_logger
->log(this->worker_logger
, CONTROL
| LEVEL3
, "Message doesn't have to be retransmitted");
482 stop_retransmitting
= TRUE
;
485 this->worker_logger
->log(this->worker_logger
, CONTROL
|LEVEL2
, "Checkin IKE SA %lld:%lld, role %s",
486 ike_sa_id
->get_initiator_spi(ike_sa_id
),
487 ike_sa_id
->get_responder_spi(ike_sa_id
),
488 ike_sa_id
->is_initiator(ike_sa_id
) ? "initiator" : "responder");
490 status
= charon
->ike_sa_manager
->checkin(charon
->ike_sa_manager
, ike_sa
);
491 if (status
!= SUCCESS
)
493 this->worker_logger
->log(this->worker_logger
, ERROR
, "Checkin of IKE SA failed!");
496 if (stop_retransmitting
)
502 job
->increase_retransmit_count(job
);
503 status
= charon
->configuration
->get_retransmit_timeout (charon
->configuration
,job
->get_retransmit_count(job
),&timeout
);
504 if (status
!= SUCCESS
)
506 this->worker_logger
->log(this->worker_logger
, CONTROL
| LEVEL2
, "Message will not be anymore retransmitted");
509 * TODO delete IKE_SA ?
513 charon
->event_queue
->add_relative(charon
->event_queue
,(job_t
*) job
,timeout
);
519 * Implementation of private_thread_pool_t.create_delete_half_open_ike_sa_job.
521 static void create_delete_half_open_ike_sa_job(private_thread_pool_t
*this,ike_sa_id_t
*ike_sa_id
, u_int32_t delay
)
525 this->worker_logger
->log(this->worker_logger
, CONTROL
| LEVEL2
, "Going to create job to delete half open IKE_SA in %d ms", delay
);
527 delete_job
= (job_t
*) delete_half_open_ike_sa_job_create(ike_sa_id
);
528 charon
->event_queue
->add_relative(charon
->event_queue
,delete_job
, delay
);
533 * Implementation of thread_pool_t.get_pool_size.
535 static size_t get_pool_size(private_thread_pool_t
*this)
537 return this->pool_size
;
541 * Implementation of thread_pool_t.destroy.
543 static void destroy(private_thread_pool_t
*this)
546 /* flag thread for termination */
547 for (current
= 0; current
< this->pool_size
; current
++) {
548 this->pool_logger
->log(this->pool_logger
, CONTROL
, "cancelling worker thread #%d", current
+1);
549 pthread_cancel(this->threads
[current
]);
552 /* wait for all threads */
553 for (current
= 0; current
< this->pool_size
; current
++) {
554 if (pthread_join(this->threads
[current
], NULL
) == 0)
556 this->pool_logger
->log(this->pool_logger
, CONTROL
, "worker thread #%d terminated", current
+1);
560 this->pool_logger
->log(this->pool_logger
, ERROR
, "could not terminate worker thread #%d", current
+1);
570 * Described in header.
572 thread_pool_t
*thread_pool_create(size_t pool_size
)
576 private_thread_pool_t
*this = malloc_thing(private_thread_pool_t
);
578 /* fill in public fields */
579 this->public.destroy
= (void(*)(thread_pool_t
*))destroy
;
580 this->public.get_pool_size
= (size_t(*)(thread_pool_t
*))get_pool_size
;
582 this->process_jobs
= process_jobs
;
583 this->process_initiate_ike_sa_job
= process_initiate_ike_sa_job
;
584 this->process_delete_half_open_ike_sa_job
= process_delete_half_open_ike_sa_job
;
585 this->process_delete_established_ike_sa_job
= process_delete_established_ike_sa_job
;
586 this->process_incoming_packet_job
= process_incoming_packet_job
;
587 this->process_retransmit_request_job
= process_retransmit_request_job
;
588 this->create_delete_half_open_ike_sa_job
= create_delete_half_open_ike_sa_job
;
590 this->pool_size
= pool_size
;
592 this->threads
= malloc(sizeof(pthread_t
) * pool_size
);
594 this->pool_logger
= logger_manager
->get_logger(logger_manager
, THREAD_POOL
);
596 this->worker_logger
= logger_manager
->get_logger(logger_manager
, WORKER
);
598 /* try to create as many threads as possible, up tu pool_size */
599 for (current
= 0; current
< pool_size
; current
++)
601 if (pthread_create(&(this->threads
[current
]), NULL
, (void*(*)(void*))this->process_jobs
, this) == 0)
603 this->pool_logger
->log(this->pool_logger
, CONTROL
, "Created worker thread #%d", current
+1);
607 /* creation failed, is it the first one? */
610 this->pool_logger
->log(this->pool_logger
, ERROR
, "Could not create any thread");
615 /* not all threads could be created, but at least one :-/ */
616 this->pool_logger
->log(this->pool_logger
, ERROR
, "Could only create %d from requested %d threads!", current
, pool_size
);
618 this->pool_size
= current
;
619 return (thread_pool_t
*)this;
622 return (thread_pool_t
*)this;