2 * Copyright (C) 2014 Martin Willi
4 * Copyright (C) secunet Security Networks AG
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
17 #include "vici_socket.h"
19 #include <threading/mutex.h>
20 #include <threading/condvar.h>
21 #include <threading/thread.h>
22 #include <collections/array.h>
23 #include <collections/linked_list.h>
24 #include <processing/jobs/callback_job.h>
29 typedef struct private_vici_socket_t private_vici_socket_t
;
32 * Private members of vici_socket_t
34 struct private_vici_socket_t
{
42 * Inbound message callback
44 vici_inbound_cb_t inbound
;
47 * Client connect callback
49 vici_connect_cb_t connect
;
52 * Client disconnect callback
54 vici_disconnect_cb_t disconnect
;
57 * Next client connection identifier
62 * User data for callbacks
67 * Service accepting vici connections
69 stream_service_t
*service
;
72 * Client connections, as entry_t
74 linked_list_t
*connections
;
77 * mutex for client connections
83 * Data to securely reference an entry
86 /* reference to socket instance */
87 private_vici_socket_t
*this;
88 /** connection identifier of entry */
93 * Partially processed message
96 /** bytes of length header sent/received */
98 /** bytes of length header */
99 char hdr
[sizeof(uint32_t)];
100 /** send/receive buffer on heap */
102 /** bytes sent/received in buffer */
107 * Client connection entry
110 /** reference to socket */
111 private_vici_socket_t
*this;
112 /** associated stream */
114 /** queued messages to send, as msg_buf_t pointers */
116 /** input message buffer */
118 /** queued input messages to process, as chunk_t */
120 /** do we have job processing input queue? */
122 /** is this client disconnecting */
124 /** client connection identifier */
126 /** any users reading over this connection? */
128 /** any users writing over this connection? */
130 /** condvar to wait for usage */
135 * Destroy an connection entry
137 CALLBACK(destroy_entry
, void,
143 entry
->stream
->destroy(entry
->stream
);
144 entry
->this->disconnect(entry
->this->user
, entry
->id
);
145 entry
->cond
->destroy(entry
->cond
);
147 while (array_remove(entry
->out
, ARRAY_TAIL
, &out
))
149 chunk_clear(&out
->buf
);
152 array_destroy(entry
->out
);
153 while (array_remove(entry
->queue
, ARRAY_TAIL
, &chunk
))
157 array_destroy(entry
->queue
);
158 chunk_clear(&entry
->in
.buf
);
163 * Find entry by stream (if given) or id, claim use
165 static entry_t
* find_entry(private_vici_socket_t
*this, stream_t
*stream
,
166 u_int id
, bool reader
, bool writer
)
168 enumerator_t
*enumerator
;
169 entry_t
*entry
, *found
= NULL
;
170 bool candidate
= TRUE
;
172 this->mutex
->lock(this->mutex
);
173 while (candidate
&& !found
)
176 enumerator
= this->connections
->create_enumerator(this->connections
);
177 while (enumerator
->enumerate(enumerator
, &entry
))
181 if (entry
->stream
!= stream
)
193 if (entry
->disconnecting
)
195 entry
->cond
->signal(entry
->cond
);
200 if ((reader
&& entry
->readers
) ||
201 (writer
&& entry
->writers
))
203 entry
->cond
->wait(entry
->cond
, this->mutex
);
217 enumerator
->destroy(enumerator
);
219 this->mutex
->unlock(this->mutex
);
225 * Remove entry by id, claim use
227 static entry_t
* remove_entry(private_vici_socket_t
*this, u_int id
)
229 enumerator_t
*enumerator
;
230 entry_t
*entry
, *found
= NULL
;
231 bool candidate
= TRUE
;
233 this->mutex
->lock(this->mutex
);
234 while (candidate
&& !found
)
237 enumerator
= this->connections
->create_enumerator(this->connections
);
238 while (enumerator
->enumerate(enumerator
, &entry
))
243 if (entry
->readers
|| entry
->writers
)
245 entry
->cond
->wait(entry
->cond
, this->mutex
);
248 this->connections
->remove_at(this->connections
, enumerator
);
249 entry
->cond
->broadcast(entry
->cond
);
254 enumerator
->destroy(enumerator
);
256 this->mutex
->unlock(this->mutex
);
262 * Release a claimed entry
264 static void put_entry(private_vici_socket_t
*this, entry_t
*entry
,
265 bool reader
, bool writer
)
267 this->mutex
->lock(this->mutex
);
276 entry
->cond
->signal(entry
->cond
);
277 this->mutex
->unlock(this->mutex
);
281 * Asynchronous callback to disconnect client
283 CALLBACK(disconnect_async
, job_requeue_t
,
284 entry_selector_t
*sel
)
288 entry
= remove_entry(sel
->this, sel
->id
);
291 destroy_entry(entry
);
293 return JOB_REQUEUE_NONE
;
297 * Disconnect a connected client
299 static void disconnect(private_vici_socket_t
*this, u_int id
)
301 entry_selector_t
*sel
;
308 lib
->processor
->queue_job(lib
->processor
,
309 (job_t
*)callback_job_create(disconnect_async
, sel
, free
, NULL
));
313 * Write queued output data
315 static bool do_write(private_vici_socket_t
*this, entry_t
*entry
,
316 stream_t
*stream
, char *errmsg
, size_t errlen
, bool block
)
321 while (array_get(entry
->out
, ARRAY_HEAD
, &out
))
324 while (out
->hdrlen
< sizeof(out
->hdr
))
326 len
= stream
->write(stream
, out
->hdr
+ out
->hdrlen
,
327 sizeof(out
->hdr
) - out
->hdrlen
, block
);
334 if (errno
== EWOULDBLOCK
)
338 snprintf(errmsg
, errlen
, "vici header write error: %s",
345 /* write buffer buffer */
346 while (out
->buf
.len
> out
->done
)
348 len
= stream
->write(stream
, out
->buf
.ptr
+ out
->done
,
349 out
->buf
.len
- out
->done
, block
);
352 snprintf(errmsg
, errlen
, "premature vici disconnect");
357 if (errno
== EWOULDBLOCK
)
361 snprintf(errmsg
, errlen
, "vici write error: %s", strerror(errno
));
367 if (array_remove(entry
->out
, ARRAY_HEAD
, &out
))
369 chunk_clear(&out
->buf
);
377 * Send pending messages
379 CALLBACK(on_write
, bool,
380 private_vici_socket_t
*this, stream_t
*stream
)
382 char errmsg
[256] = "";
386 entry
= find_entry(this, stream
, 0, FALSE
, TRUE
);
389 ret
= do_write(this, entry
, stream
, errmsg
, sizeof(errmsg
), FALSE
);
392 /* unregister if we have no more messages to send */
393 ret
= array_count(entry
->out
) != 0;
397 entry
->disconnecting
= TRUE
;
398 disconnect(entry
->this, entry
->id
);
400 put_entry(this, entry
, FALSE
, TRUE
);
402 if (!ret
&& errmsg
[0])
404 DBG1(DBG_CFG
, errmsg
);
412 * Read in available header with data, non-blocking accumulating to buffer
414 static bool do_read(private_vici_socket_t
*this, entry_t
*entry
,
415 stream_t
*stream
, char *errmsg
, size_t errlen
)
420 /* assemble the length header first */
421 while (entry
->in
.hdrlen
< sizeof(entry
->in
.hdr
))
423 len
= stream
->read(stream
, entry
->in
.hdr
+ entry
->in
.hdrlen
,
424 sizeof(entry
->in
.hdr
) - entry
->in
.hdrlen
, FALSE
);
431 if (errno
== EWOULDBLOCK
)
435 snprintf(errmsg
, errlen
, "vici header read error: %s",
439 entry
->in
.hdrlen
+= len
;
440 if (entry
->in
.hdrlen
== sizeof(entry
->in
.hdr
))
442 msglen
= untoh32(entry
->in
.hdr
);
443 if (msglen
> VICI_MESSAGE_SIZE_MAX
)
445 snprintf(errmsg
, errlen
, "vici message length %u exceeds %u "
446 "bytes limit, ignored", msglen
, VICI_MESSAGE_SIZE_MAX
);
449 /* header complete, continue with data */
450 entry
->in
.buf
= chunk_alloc(msglen
);
454 /* assemble buffer */
455 while (entry
->in
.buf
.len
> entry
->in
.done
)
457 len
= stream
->read(stream
, entry
->in
.buf
.ptr
+ entry
->in
.done
,
458 entry
->in
.buf
.len
- entry
->in
.done
, FALSE
);
461 snprintf(errmsg
, errlen
, "premature vici disconnect");
466 if (errno
== EWOULDBLOCK
)
470 snprintf(errmsg
, errlen
, "vici read error: %s", strerror(errno
));
473 entry
->in
.done
+= len
;
480 * Callback processing incoming requests in strict order
482 CALLBACK(process_queue
, job_requeue_t
,
483 entry_selector_t
*sel
)
492 entry
= find_entry(sel
->this, NULL
, sel
->id
, TRUE
, FALSE
);
498 found
= array_remove(entry
->queue
, ARRAY_HEAD
, &chunk
);
501 entry
->has_processor
= FALSE
;
504 put_entry(sel
->this, entry
, TRUE
, FALSE
);
510 thread_cleanup_push((void*)chunk_clear
, &chunk
);
511 sel
->this->inbound(sel
->this->user
, id
, chunk
);
512 thread_cleanup_pop(TRUE
);
514 return JOB_REQUEUE_NONE
;
518 * Process incoming messages
520 CALLBACK(on_read
, bool,
521 private_vici_socket_t
*this, stream_t
*stream
)
523 char errmsg
[256] = "";
524 entry_selector_t
*sel
;
528 entry
= find_entry(this, stream
, 0, TRUE
, FALSE
);
531 ret
= do_read(this, entry
, stream
, errmsg
, sizeof(errmsg
));
534 entry
->disconnecting
= TRUE
;
535 disconnect(this, entry
->id
);
537 else if (entry
->in
.hdrlen
== sizeof(entry
->in
.hdr
) &&
538 entry
->in
.buf
.len
== entry
->in
.done
)
540 array_insert(entry
->queue
, ARRAY_TAIL
, &entry
->in
.buf
);
541 entry
->in
.buf
= chunk_empty
;
542 entry
->in
.hdrlen
= entry
->in
.done
= 0;
544 if (!entry
->has_processor
)
550 lib
->processor
->queue_job(lib
->processor
,
551 (job_t
*)callback_job_create(process_queue
,
553 entry
->has_processor
= TRUE
;
556 put_entry(this, entry
, TRUE
, FALSE
);
558 if (!ret
&& errmsg
[0])
560 DBG1(DBG_CFG
, errmsg
);
568 * Process connection request
570 CALLBACK(on_accept
, bool,
571 private_vici_socket_t
*this, stream_t
*stream
)
576 id
= ref_get(&this->nextid
);
582 .out
= array_create(0, 0),
583 .queue
= array_create(sizeof(chunk_t
), 0),
584 .cond
= condvar_create(CONDVAR_TYPE_DEFAULT
),
588 this->mutex
->lock(this->mutex
);
589 this->connections
->insert_last(this->connections
, entry
);
590 this->mutex
->unlock(this->mutex
);
592 stream
->on_read(stream
, on_read
, this);
594 put_entry(this, entry
, TRUE
, FALSE
);
596 this->connect(this->user
, id
);
602 * Async callback to enable writer
604 CALLBACK(enable_writer
, job_requeue_t
,
605 entry_selector_t
*sel
)
609 entry
= find_entry(sel
->this, NULL
, sel
->id
, FALSE
, TRUE
);
612 entry
->stream
->on_write(entry
->stream
, on_write
, sel
->this);
613 put_entry(sel
->this, entry
, FALSE
, TRUE
);
615 return JOB_REQUEUE_NONE
;
618 METHOD(vici_socket_t
, send_
, void,
619 private_vici_socket_t
*this, u_int id
, chunk_t msg
)
621 if (msg
.len
<= VICI_MESSAGE_SIZE_MAX
)
623 entry_selector_t
*sel
;
627 entry
= find_entry(this, NULL
, id
, FALSE
, TRUE
);
633 htoun32(out
->hdr
, msg
.len
);
635 array_insert(entry
->out
, ARRAY_TAIL
, out
);
636 if (array_count(entry
->out
) == 1)
637 { /* asynchronously re-enable on_write callback when we get data */
642 lib
->processor
->queue_job(lib
->processor
,
643 (job_t
*)callback_job_create(enable_writer
,
646 put_entry(this, entry
, FALSE
, TRUE
);
650 DBG1(DBG_CFG
, "vici connection %u unknown", id
);
656 DBG1(DBG_CFG
, "vici message size %zu exceeds maximum size of %u, "
657 "discarded", msg
.len
, VICI_MESSAGE_SIZE_MAX
);
662 CALLBACK(flush_messages
, void,
663 entry_t
*entry
, va_list args
)
665 private_vici_socket_t
*this;
666 char errmsg
[256] = "";
669 VA_ARGS_VGET(args
, this);
671 /* no need for any locking as no other threads are running, the connections
672 * all get disconnected afterwards, so error handling is simple too */
673 ret
= do_write(this, entry
, entry
->stream
, errmsg
, sizeof(errmsg
), TRUE
);
675 if (!ret
&& errmsg
[0])
677 DBG1(DBG_CFG
, errmsg
);
681 METHOD(vici_socket_t
, destroy
, void,
682 private_vici_socket_t
*this)
684 DESTROY_IF(this->service
);
685 this->connections
->invoke_function(this->connections
, flush_messages
, this);
686 this->connections
->destroy_function(this->connections
, destroy_entry
);
687 this->mutex
->destroy(this->mutex
);
694 vici_socket_t
*vici_socket_create(char *uri
, vici_inbound_cb_t inbound
,
695 vici_connect_cb_t connect
,
696 vici_disconnect_cb_t disconnect
, void *user
)
698 private_vici_socket_t
*this;
705 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
706 .connections
= linked_list_create(),
709 .disconnect
= disconnect
,
713 this->service
= lib
->streams
->create_service(lib
->streams
, uri
, 3);
716 DBG1(DBG_CFG
, "creating vici socket failed");
720 this->service
->on_accept(this->service
, on_accept
, this,
721 JOB_PRIO_CRITICAL
, 0);
723 return &this->public;