]> git.ipfire.org Git - thirdparty/strongswan.git/blob - src/libcharon/plugins/vici/vici_socket.c
Update copyright headers after acquisition by secunet
[thirdparty/strongswan.git] / src / libcharon / plugins / vici / vici_socket.c
1 /*
2 * Copyright (C) 2014 Martin Willi
3 *
4 * Copyright (C) secunet Security Networks AG
5 *
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * for more details.
15 */
16
17 #include "vici_socket.h"
18
19 #include <threading/mutex.h>
20 #include <threading/condvar.h>
21 #include <threading/thread.h>
22 #include <collections/array.h>
23 #include <collections/linked_list.h>
24 #include <processing/jobs/callback_job.h>
25
26 #include <errno.h>
27 #include <string.h>
28
29 typedef struct private_vici_socket_t private_vici_socket_t;
30
31 /**
32 * Private members of vici_socket_t
33 */
34 struct private_vici_socket_t {
35
36 /**
37 * public functions
38 */
39 vici_socket_t public;
40
41 /**
42 * Inbound message callback
43 */
44 vici_inbound_cb_t inbound;
45
46 /**
47 * Client connect callback
48 */
49 vici_connect_cb_t connect;
50
51 /**
52 * Client disconnect callback
53 */
54 vici_disconnect_cb_t disconnect;
55
56 /**
57 * Next client connection identifier
58 */
59 u_int nextid;
60
61 /**
62 * User data for callbacks
63 */
64 void *user;
65
66 /**
67 * Service accepting vici connections
68 */
69 stream_service_t *service;
70
71 /**
72 * Client connections, as entry_t
73 */
74 linked_list_t *connections;
75
76 /**
77 * mutex for client connections
78 */
79 mutex_t *mutex;
80 };
81
82 /**
83 * Data to securely reference an entry
84 */
85 typedef struct {
86 /* reference to socket instance */
87 private_vici_socket_t *this;
88 /** connection identifier of entry */
89 u_int id;
90 } entry_selector_t;
91
92 /**
93 * Partially processed message
94 */
95 typedef struct {
96 /** bytes of length header sent/received */
97 u_char hdrlen;
98 /** bytes of length header */
99 char hdr[sizeof(uint32_t)];
100 /** send/receive buffer on heap */
101 chunk_t buf;
102 /** bytes sent/received in buffer */
103 uint32_t done;
104 } msg_buf_t;
105
106 /**
107 * Client connection entry
108 */
109 typedef struct {
110 /** reference to socket */
111 private_vici_socket_t *this;
112 /** associated stream */
113 stream_t *stream;
114 /** queued messages to send, as msg_buf_t pointers */
115 array_t *out;
116 /** input message buffer */
117 msg_buf_t in;
118 /** queued input messages to process, as chunk_t */
119 array_t *queue;
120 /** do we have job processing input queue? */
121 bool has_processor;
122 /** is this client disconnecting */
123 bool disconnecting;
124 /** client connection identifier */
125 u_int id;
126 /** any users reading over this connection? */
127 int readers;
128 /** any users writing over this connection? */
129 int writers;
130 /** condvar to wait for usage */
131 condvar_t *cond;
132 } entry_t;
133
134 /**
135 * Destroy an connection entry
136 */
137 CALLBACK(destroy_entry, void,
138 entry_t *entry)
139 {
140 msg_buf_t *out;
141 chunk_t chunk;
142
143 entry->stream->destroy(entry->stream);
144 entry->this->disconnect(entry->this->user, entry->id);
145 entry->cond->destroy(entry->cond);
146
147 while (array_remove(entry->out, ARRAY_TAIL, &out))
148 {
149 chunk_clear(&out->buf);
150 free(out);
151 }
152 array_destroy(entry->out);
153 while (array_remove(entry->queue, ARRAY_TAIL, &chunk))
154 {
155 chunk_clear(&chunk);
156 }
157 array_destroy(entry->queue);
158 chunk_clear(&entry->in.buf);
159 free(entry);
160 }
161
162 /**
163 * Find entry by stream (if given) or id, claim use
164 */
165 static entry_t* find_entry(private_vici_socket_t *this, stream_t *stream,
166 u_int id, bool reader, bool writer)
167 {
168 enumerator_t *enumerator;
169 entry_t *entry, *found = NULL;
170 bool candidate = TRUE;
171
172 this->mutex->lock(this->mutex);
173 while (candidate && !found)
174 {
175 candidate = FALSE;
176 enumerator = this->connections->create_enumerator(this->connections);
177 while (enumerator->enumerate(enumerator, &entry))
178 {
179 if (stream)
180 {
181 if (entry->stream != stream)
182 {
183 continue;
184 }
185 }
186 else
187 {
188 if (entry->id != id)
189 {
190 continue;
191 }
192 }
193 if (entry->disconnecting)
194 {
195 entry->cond->signal(entry->cond);
196 continue;
197 }
198 candidate = TRUE;
199
200 if ((reader && entry->readers) ||
201 (writer && entry->writers))
202 {
203 entry->cond->wait(entry->cond, this->mutex);
204 break;
205 }
206 if (reader)
207 {
208 entry->readers++;
209 }
210 if (writer)
211 {
212 entry->writers++;
213 }
214 found = entry;
215 break;
216 }
217 enumerator->destroy(enumerator);
218 }
219 this->mutex->unlock(this->mutex);
220
221 return found;
222 }
223
224 /**
225 * Remove entry by id, claim use
226 */
227 static entry_t* remove_entry(private_vici_socket_t *this, u_int id)
228 {
229 enumerator_t *enumerator;
230 entry_t *entry, *found = NULL;
231 bool candidate = TRUE;
232
233 this->mutex->lock(this->mutex);
234 while (candidate && !found)
235 {
236 candidate = FALSE;
237 enumerator = this->connections->create_enumerator(this->connections);
238 while (enumerator->enumerate(enumerator, &entry))
239 {
240 if (entry->id == id)
241 {
242 candidate = TRUE;
243 if (entry->readers || entry->writers)
244 {
245 entry->cond->wait(entry->cond, this->mutex);
246 break;
247 }
248 this->connections->remove_at(this->connections, enumerator);
249 entry->cond->broadcast(entry->cond);
250 found = entry;
251 break;
252 }
253 }
254 enumerator->destroy(enumerator);
255 }
256 this->mutex->unlock(this->mutex);
257
258 return found;
259 }
260
261 /**
262 * Release a claimed entry
263 */
264 static void put_entry(private_vici_socket_t *this, entry_t *entry,
265 bool reader, bool writer)
266 {
267 this->mutex->lock(this->mutex);
268 if (reader)
269 {
270 entry->readers--;
271 }
272 if (writer)
273 {
274 entry->writers--;
275 }
276 entry->cond->signal(entry->cond);
277 this->mutex->unlock(this->mutex);
278 }
279
280 /**
281 * Asynchronous callback to disconnect client
282 */
283 CALLBACK(disconnect_async, job_requeue_t,
284 entry_selector_t *sel)
285 {
286 entry_t *entry;
287
288 entry = remove_entry(sel->this, sel->id);
289 if (entry)
290 {
291 destroy_entry(entry);
292 }
293 return JOB_REQUEUE_NONE;
294 }
295
296 /**
297 * Disconnect a connected client
298 */
299 static void disconnect(private_vici_socket_t *this, u_int id)
300 {
301 entry_selector_t *sel;
302
303 INIT(sel,
304 .this = this,
305 .id = id,
306 );
307
308 lib->processor->queue_job(lib->processor,
309 (job_t*)callback_job_create(disconnect_async, sel, free, NULL));
310 }
311
312 /**
313 * Write queued output data
314 */
315 static bool do_write(private_vici_socket_t *this, entry_t *entry,
316 stream_t *stream, char *errmsg, size_t errlen, bool block)
317 {
318 msg_buf_t *out;
319 ssize_t len;
320
321 while (array_get(entry->out, ARRAY_HEAD, &out))
322 {
323 /* write header */
324 while (out->hdrlen < sizeof(out->hdr))
325 {
326 len = stream->write(stream, out->hdr + out->hdrlen,
327 sizeof(out->hdr) - out->hdrlen, block);
328 if (len == 0)
329 {
330 return FALSE;
331 }
332 if (len < 0)
333 {
334 if (errno == EWOULDBLOCK)
335 {
336 return TRUE;
337 }
338 snprintf(errmsg, errlen, "vici header write error: %s",
339 strerror(errno));
340 return FALSE;
341 }
342 out->hdrlen += len;
343 }
344
345 /* write buffer buffer */
346 while (out->buf.len > out->done)
347 {
348 len = stream->write(stream, out->buf.ptr + out->done,
349 out->buf.len - out->done, block);
350 if (len == 0)
351 {
352 snprintf(errmsg, errlen, "premature vici disconnect");
353 return FALSE;
354 }
355 if (len < 0)
356 {
357 if (errno == EWOULDBLOCK)
358 {
359 return TRUE;
360 }
361 snprintf(errmsg, errlen, "vici write error: %s", strerror(errno));
362 return FALSE;
363 }
364 out->done += len;
365 }
366
367 if (array_remove(entry->out, ARRAY_HEAD, &out))
368 {
369 chunk_clear(&out->buf);
370 free(out);
371 }
372 }
373 return TRUE;
374 }
375
376 /**
377 * Send pending messages
378 */
379 CALLBACK(on_write, bool,
380 private_vici_socket_t *this, stream_t *stream)
381 {
382 char errmsg[256] = "";
383 entry_t *entry;
384 bool ret = FALSE;
385
386 entry = find_entry(this, stream, 0, FALSE, TRUE);
387 if (entry)
388 {
389 ret = do_write(this, entry, stream, errmsg, sizeof(errmsg), FALSE);
390 if (ret)
391 {
392 /* unregister if we have no more messages to send */
393 ret = array_count(entry->out) != 0;
394 }
395 else
396 {
397 entry->disconnecting = TRUE;
398 disconnect(entry->this, entry->id);
399 }
400 put_entry(this, entry, FALSE, TRUE);
401
402 if (!ret && errmsg[0])
403 {
404 DBG1(DBG_CFG, errmsg);
405 }
406 }
407
408 return ret;
409 }
410
411 /**
412 * Read in available header with data, non-blocking accumulating to buffer
413 */
414 static bool do_read(private_vici_socket_t *this, entry_t *entry,
415 stream_t *stream, char *errmsg, size_t errlen)
416 {
417 uint32_t msglen;
418 ssize_t len;
419
420 /* assemble the length header first */
421 while (entry->in.hdrlen < sizeof(entry->in.hdr))
422 {
423 len = stream->read(stream, entry->in.hdr + entry->in.hdrlen,
424 sizeof(entry->in.hdr) - entry->in.hdrlen, FALSE);
425 if (len == 0)
426 {
427 return FALSE;
428 }
429 if (len < 0)
430 {
431 if (errno == EWOULDBLOCK)
432 {
433 return TRUE;
434 }
435 snprintf(errmsg, errlen, "vici header read error: %s",
436 strerror(errno));
437 return FALSE;
438 }
439 entry->in.hdrlen += len;
440 if (entry->in.hdrlen == sizeof(entry->in.hdr))
441 {
442 msglen = untoh32(entry->in.hdr);
443 if (msglen > VICI_MESSAGE_SIZE_MAX)
444 {
445 snprintf(errmsg, errlen, "vici message length %u exceeds %u "
446 "bytes limit, ignored", msglen, VICI_MESSAGE_SIZE_MAX);
447 return FALSE;
448 }
449 /* header complete, continue with data */
450 entry->in.buf = chunk_alloc(msglen);
451 }
452 }
453
454 /* assemble buffer */
455 while (entry->in.buf.len > entry->in.done)
456 {
457 len = stream->read(stream, entry->in.buf.ptr + entry->in.done,
458 entry->in.buf.len - entry->in.done, FALSE);
459 if (len == 0)
460 {
461 snprintf(errmsg, errlen, "premature vici disconnect");
462 return FALSE;
463 }
464 if (len < 0)
465 {
466 if (errno == EWOULDBLOCK)
467 {
468 return TRUE;
469 }
470 snprintf(errmsg, errlen, "vici read error: %s", strerror(errno));
471 return FALSE;
472 }
473 entry->in.done += len;
474 }
475
476 return TRUE;
477 }
478
479 /**
480 * Callback processing incoming requests in strict order
481 */
482 CALLBACK(process_queue, job_requeue_t,
483 entry_selector_t *sel)
484 {
485 entry_t *entry;
486 chunk_t chunk;
487 bool found;
488 u_int id;
489
490 while (TRUE)
491 {
492 entry = find_entry(sel->this, NULL, sel->id, TRUE, FALSE);
493 if (!entry)
494 {
495 break;
496 }
497
498 found = array_remove(entry->queue, ARRAY_HEAD, &chunk);
499 if (!found)
500 {
501 entry->has_processor = FALSE;
502 }
503 id = entry->id;
504 put_entry(sel->this, entry, TRUE, FALSE);
505 if (!found)
506 {
507 break;
508 }
509
510 thread_cleanup_push((void*)chunk_clear, &chunk);
511 sel->this->inbound(sel->this->user, id, chunk);
512 thread_cleanup_pop(TRUE);
513 }
514 return JOB_REQUEUE_NONE;
515 }
516
517 /**
518 * Process incoming messages
519 */
520 CALLBACK(on_read, bool,
521 private_vici_socket_t *this, stream_t *stream)
522 {
523 char errmsg[256] = "";
524 entry_selector_t *sel;
525 entry_t *entry;
526 bool ret = FALSE;
527
528 entry = find_entry(this, stream, 0, TRUE, FALSE);
529 if (entry)
530 {
531 ret = do_read(this, entry, stream, errmsg, sizeof(errmsg));
532 if (!ret)
533 {
534 entry->disconnecting = TRUE;
535 disconnect(this, entry->id);
536 }
537 else if (entry->in.hdrlen == sizeof(entry->in.hdr) &&
538 entry->in.buf.len == entry->in.done)
539 {
540 array_insert(entry->queue, ARRAY_TAIL, &entry->in.buf);
541 entry->in.buf = chunk_empty;
542 entry->in.hdrlen = entry->in.done = 0;
543
544 if (!entry->has_processor)
545 {
546 INIT(sel,
547 .this = this,
548 .id = entry->id,
549 );
550 lib->processor->queue_job(lib->processor,
551 (job_t*)callback_job_create(process_queue,
552 sel, free, NULL));
553 entry->has_processor = TRUE;
554 }
555 }
556 put_entry(this, entry, TRUE, FALSE);
557
558 if (!ret && errmsg[0])
559 {
560 DBG1(DBG_CFG, errmsg);
561 }
562 }
563
564 return ret;
565 }
566
567 /**
568 * Process connection request
569 */
570 CALLBACK(on_accept, bool,
571 private_vici_socket_t *this, stream_t *stream)
572 {
573 entry_t *entry;
574 u_int id;
575
576 id = ref_get(&this->nextid);
577
578 INIT(entry,
579 .this = this,
580 .stream = stream,
581 .id = id,
582 .out = array_create(0, 0),
583 .queue = array_create(sizeof(chunk_t), 0),
584 .cond = condvar_create(CONDVAR_TYPE_DEFAULT),
585 .readers = 1,
586 );
587
588 this->mutex->lock(this->mutex);
589 this->connections->insert_last(this->connections, entry);
590 this->mutex->unlock(this->mutex);
591
592 stream->on_read(stream, on_read, this);
593
594 put_entry(this, entry, TRUE, FALSE);
595
596 this->connect(this->user, id);
597
598 return TRUE;
599 }
600
601 /**
602 * Async callback to enable writer
603 */
604 CALLBACK(enable_writer, job_requeue_t,
605 entry_selector_t *sel)
606 {
607 entry_t *entry;
608
609 entry = find_entry(sel->this, NULL, sel->id, FALSE, TRUE);
610 if (entry)
611 {
612 entry->stream->on_write(entry->stream, on_write, sel->this);
613 put_entry(sel->this, entry, FALSE, TRUE);
614 }
615 return JOB_REQUEUE_NONE;
616 }
617
618 METHOD(vici_socket_t, send_, void,
619 private_vici_socket_t *this, u_int id, chunk_t msg)
620 {
621 if (msg.len <= VICI_MESSAGE_SIZE_MAX)
622 {
623 entry_selector_t *sel;
624 msg_buf_t *out;
625 entry_t *entry;
626
627 entry = find_entry(this, NULL, id, FALSE, TRUE);
628 if (entry)
629 {
630 INIT(out,
631 .buf = msg,
632 );
633 htoun32(out->hdr, msg.len);
634
635 array_insert(entry->out, ARRAY_TAIL, out);
636 if (array_count(entry->out) == 1)
637 { /* asynchronously re-enable on_write callback when we get data */
638 INIT(sel,
639 .this = this,
640 .id = entry->id,
641 );
642 lib->processor->queue_job(lib->processor,
643 (job_t*)callback_job_create(enable_writer,
644 sel, free, NULL));
645 }
646 put_entry(this, entry, FALSE, TRUE);
647 }
648 else
649 {
650 DBG1(DBG_CFG, "vici connection %u unknown", id);
651 chunk_clear(&msg);
652 }
653 }
654 else
655 {
656 DBG1(DBG_CFG, "vici message size %zu exceeds maximum size of %u, "
657 "discarded", msg.len, VICI_MESSAGE_SIZE_MAX);
658 chunk_clear(&msg);
659 }
660 }
661
662 CALLBACK(flush_messages, void,
663 entry_t *entry, va_list args)
664 {
665 private_vici_socket_t *this;
666 char errmsg[256] = "";
667 bool ret;
668
669 VA_ARGS_VGET(args, this);
670
671 /* no need for any locking as no other threads are running, the connections
672 * all get disconnected afterwards, so error handling is simple too */
673 ret = do_write(this, entry, entry->stream, errmsg, sizeof(errmsg), TRUE);
674
675 if (!ret && errmsg[0])
676 {
677 DBG1(DBG_CFG, errmsg);
678 }
679 }
680
681 METHOD(vici_socket_t, destroy, void,
682 private_vici_socket_t *this)
683 {
684 DESTROY_IF(this->service);
685 this->connections->invoke_function(this->connections, flush_messages, this);
686 this->connections->destroy_function(this->connections, destroy_entry);
687 this->mutex->destroy(this->mutex);
688 free(this);
689 }
690
691 /*
692 * see header file
693 */
694 vici_socket_t *vici_socket_create(char *uri, vici_inbound_cb_t inbound,
695 vici_connect_cb_t connect,
696 vici_disconnect_cb_t disconnect, void *user)
697 {
698 private_vici_socket_t *this;
699
700 INIT(this,
701 .public = {
702 .send = _send_,
703 .destroy = _destroy,
704 },
705 .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
706 .connections = linked_list_create(),
707 .inbound = inbound,
708 .connect = connect,
709 .disconnect = disconnect,
710 .user = user,
711 );
712
713 this->service = lib->streams->create_service(lib->streams, uri, 3);
714 if (!this->service)
715 {
716 DBG1(DBG_CFG, "creating vici socket failed");
717 destroy(this);
718 return NULL;
719 }
720 this->service->on_accept(this->service, on_accept, this,
721 JOB_PRIO_CRITICAL, 0);
722
723 return &this->public;
724 }