2 * Copyright (C) 2014 Martin Willi
4 * Copyright (C) secunet Security Networks AG
6 * This program is free software; you can redistribute it and/or modify it
7 * under the terms of the GNU General Public License as published by the
8 * Free Software Foundation; either version 2 of the License, or (at your
9 * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
13 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
18 * Copyright (C) 2014 Timo Teräs <timo.teras@iki.fi>
20 * Permission is hereby granted, free of charge, to any person obtaining a copy
21 * of this software and associated documentation files (the "Software"), to deal
22 * in the Software without restriction, including without limitation the rights
23 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
24 * copies of the Software, and to permit persons to whom the Software is
25 * furnished to do so, subject to the following conditions:
27 * The above copyright notice and this permission notice shall be included in
28 * all copies or substantial portions of the Software.
30 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
31 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
32 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
33 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
34 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
35 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
39 #include "vici_dispatcher.h"
40 #include "vici_socket.h"
42 #include <bio/bio_reader.h>
43 #include <bio/bio_writer.h>
44 #include <threading/mutex.h>
45 #include <threading/condvar.h>
46 #include <threading/thread.h>
47 #include <collections/array.h>
48 #include <collections/hashtable.h>
50 typedef struct private_vici_dispatcher_t private_vici_dispatcher_t
;
53 * Private data of an vici_dispatcher_t object.
55 struct private_vici_dispatcher_t
{
58 * Public vici_dispatcher_t interface.
60 vici_dispatcher_t
public;
63 * Socket to send/receive messages
65 vici_socket_t
*socket
;
68 * List of registered commands (char* => command_t*)
73 * List of known events, and registered clients (char* => event_t*)
78 * Mutex to lock hashtables
83 * Condvar to signal command termination
94 /** callback for command */
96 /** user data to pass to callback */
98 /** command currently in use? */
108 /** registered clients, as u_int */
110 /** event currently in use? */
115 * Send a operation code, optionally with name and message
117 static void send_op(private_vici_dispatcher_t
*this, u_int id
,
118 vici_operation_t op
, char *name
, vici_message_t
*message
)
120 bio_writer_t
*writer
;
123 len
= sizeof(uint8_t);
126 len
+= sizeof(uint8_t) + strlen(name
);
130 len
+= message
->get_encoding(message
).len
;
132 writer
= bio_writer_create(len
);
133 writer
->write_uint8(writer
, op
);
136 writer
->write_data8(writer
, chunk_from_str(name
));
140 writer
->write_data(writer
, message
->get_encoding(message
));
142 this->socket
->send(this->socket
, id
, writer
->extract_buf(writer
));
143 writer
->destroy(writer
);
147 * Register client for event
149 static void register_event(private_vici_dispatcher_t
*this, char *name
,
154 this->mutex
->lock(this->mutex
);
157 event
= this->events
->get(this->events
, name
);
164 array_insert(event
->clients
, ARRAY_TAIL
, &id
);
167 this->cond
->wait(this->cond
, this->mutex
);
169 this->mutex
->unlock(this->mutex
);
173 DBG2(DBG_CFG
, "vici client %u registered for: %s", id
, name
);
174 send_op(this, id
, VICI_EVENT_CONFIRM
, NULL
, NULL
);
178 DBG1(DBG_CFG
, "vici client %u invalid registration: %s", id
, name
);
179 send_op(this, id
, VICI_EVENT_UNKNOWN
, NULL
, NULL
);
184 * Unregister client for event
186 static void unregister_event(private_vici_dispatcher_t
*this, char *name
,
189 enumerator_t
*enumerator
;
194 this->mutex
->lock(this->mutex
);
197 event
= this->events
->get(this->events
, name
);
204 enumerator
= array_create_enumerator(event
->clients
);
205 while (enumerator
->enumerate(enumerator
, ¤t
))
209 array_remove_at(event
->clients
, enumerator
);
214 enumerator
->destroy(enumerator
);
217 this->cond
->wait(this->cond
, this->mutex
);
219 this->mutex
->unlock(this->mutex
);
221 DBG2(DBG_CFG
, "vici client %u unregistered for: %s", id
, name
);
225 send_op(this, id
, VICI_EVENT_CONFIRM
, NULL
, NULL
);
229 send_op(this, id
, VICI_EVENT_UNKNOWN
, NULL
, NULL
);
234 * Data to release on thread cancellation
237 private_vici_dispatcher_t
*this;
239 vici_message_t
*request
;
243 * Release command after execution/cancellation
245 CALLBACK(release_command
, void,
246 release_data_t
*release
)
248 release
->request
->destroy(release
->request
);
250 release
->this->mutex
->lock(release
->this->mutex
);
251 if (--release
->cmd
->uses
== 0)
253 release
->this->cond
->broadcast(release
->this->cond
);
255 release
->this->mutex
->unlock(release
->this->mutex
);
261 * Process a request message
263 void process_request(private_vici_dispatcher_t
*this, char *name
, u_int id
,
266 vici_message_t
*response
= NULL
;
267 release_data_t
*release
;
270 this->mutex
->lock(this->mutex
);
271 cmd
= this->cmds
->get(this->cmds
, name
);
276 this->mutex
->unlock(this->mutex
);
285 DBG2(DBG_CFG
, "vici client %u requests: %s", id
, name
);
287 thread_cleanup_push(release_command
, release
);
289 release
->request
= vici_message_create_from_data(data
, FALSE
);
290 response
= release
->cmd
->cb(cmd
->user
, cmd
->name
, id
, release
->request
);
292 thread_cleanup_pop(TRUE
);
296 send_op(this, id
, VICI_CMD_RESPONSE
, NULL
, response
);
297 response
->destroy(response
);
302 DBG1(DBG_CFG
, "vici client %u invalid request: %s", id
, name
);
303 send_op(this, id
, VICI_CMD_UNKNOWN
, NULL
, NULL
);
307 CALLBACK(inbound
, void,
308 private_vici_dispatcher_t
*this, u_int id
, chunk_t data
)
310 bio_reader_t
*reader
;
315 reader
= bio_reader_create(data
);
316 if (reader
->read_uint8(reader
, &type
))
320 case VICI_EVENT_REGISTER
:
321 if (reader
->read_data8(reader
, &chunk
) &&
322 vici_stringify(chunk
, name
, sizeof(name
)))
324 register_event(this, name
, id
);
328 DBG1(DBG_CFG
, "invalid vici register message");
331 case VICI_EVENT_UNREGISTER
:
332 if (reader
->read_data8(reader
, &chunk
) &&
333 vici_stringify(chunk
, name
, sizeof(name
)))
335 unregister_event(this, name
, id
);
339 DBG1(DBG_CFG
, "invalid vici unregister message");
342 case VICI_CMD_REQUEST
:
343 if (reader
->read_data8(reader
, &chunk
) &&
344 vici_stringify(chunk
, name
, sizeof(name
)))
346 thread_cleanup_push((void*)reader
->destroy
, reader
);
347 process_request(this, name
, id
, reader
->peek(reader
));
348 thread_cleanup_pop(FALSE
);
352 DBG1(DBG_CFG
, "invalid vici request message");
355 case VICI_CMD_RESPONSE
:
356 case VICI_EVENT_CONFIRM
:
357 case VICI_EVENT_UNKNOWN
:
360 DBG1(DBG_CFG
, "unsupported vici operation: %u", type
);
366 DBG1(DBG_CFG
, "invalid vici message");
368 reader
->destroy(reader
);
371 CALLBACK(connect_
, void,
372 private_vici_dispatcher_t
*this, u_int id
)
374 DBG2(DBG_CFG
, "vici client %u connected", id
);
377 CALLBACK(disconnect
, void,
378 private_vici_dispatcher_t
*this, u_int id
)
380 enumerator_t
*events
, *ids
;
384 /* deregister client from all events */
385 this->mutex
->lock(this->mutex
);
386 events
= this->events
->create_enumerator(this->events
);
387 while (events
->enumerate(events
, NULL
, &event
))
391 this->cond
->wait(this->cond
, this->mutex
);
393 ids
= array_create_enumerator(event
->clients
);
394 while (ids
->enumerate(ids
, ¤t
))
398 array_remove_at(event
->clients
, ids
);
403 events
->destroy(events
);
404 this->mutex
->unlock(this->mutex
);
406 DBG2(DBG_CFG
, "vici client %u disconnected", id
);
409 METHOD(vici_dispatcher_t
, manage_command
, void,
410 private_vici_dispatcher_t
*this, char *name
,
411 vici_command_cb_t cb
, void *user
)
415 this->mutex
->lock(this->mutex
);
419 .name
= strdup(name
),
423 cmd
= this->cmds
->put(this->cmds
, cmd
->name
, cmd
);
427 cmd
= this->cmds
->remove(this->cmds
, name
);
433 this->cond
->wait(this->cond
, this->mutex
);
438 this->mutex
->unlock(this->mutex
);
441 METHOD(vici_dispatcher_t
, manage_event
, void,
442 private_vici_dispatcher_t
*this, char *name
, bool reg
)
446 this->mutex
->lock(this->mutex
);
450 .name
= strdup(name
),
451 .clients
= array_create(sizeof(u_int
), 0),
453 event
= this->events
->put(this->events
, event
->name
, event
);
457 event
= this->events
->remove(this->events
, name
);
463 this->cond
->wait(this->cond
, this->mutex
);
465 array_destroy(event
->clients
);
469 this->mutex
->unlock(this->mutex
);
472 METHOD(vici_dispatcher_t
, has_event_listeners
, bool,
473 private_vici_dispatcher_t
*this, char *name
)
478 this->mutex
->lock(this->mutex
);
479 event
= this->events
->get(this->events
, name
);
482 /* the entry might be getting destroyed, but returning
483 * false positive is not a problem as a later raise_event
484 * will check things again. */
485 retval
= array_count(event
->clients
);
487 this->mutex
->unlock(this->mutex
);
492 METHOD(vici_dispatcher_t
, raise_event
, void,
493 private_vici_dispatcher_t
*this, char *name
, u_int id
,
494 vici_message_t
*message
)
496 enumerator_t
*enumerator
;
500 this->mutex
->lock(this->mutex
);
501 event
= this->events
->get(this->events
, name
);
505 this->mutex
->unlock(this->mutex
);
507 enumerator
= array_create_enumerator(event
->clients
);
508 while (enumerator
->enumerate(enumerator
, ¤t
))
510 if (id
== 0 || id
== *current
)
512 send_op(this, *current
, VICI_EVENT
, name
, message
);
515 enumerator
->destroy(enumerator
);
517 this->mutex
->lock(this->mutex
);
518 if (--event
->uses
== 0)
520 this->cond
->broadcast(this->cond
);
523 this->mutex
->unlock(this->mutex
);
525 message
->destroy(message
);
528 METHOD(vici_dispatcher_t
, destroy
, void,
529 private_vici_dispatcher_t
*this)
531 DESTROY_IF(this->socket
);
532 this->mutex
->destroy(this->mutex
);
533 this->cond
->destroy(this->cond
);
534 this->cmds
->destroy(this->cmds
);
535 this->events
->destroy(this->events
);
542 vici_dispatcher_t
*vici_dispatcher_create(char *uri
)
544 private_vici_dispatcher_t
*this;
548 .manage_command
= _manage_command
,
549 .manage_event
= _manage_event
,
550 .has_event_listeners
= _has_event_listeners
,
551 .raise_event
= _raise_event
,
554 .cmds
= hashtable_create(hashtable_hash_str
, hashtable_equals_str
, 1),
555 .events
= hashtable_create(hashtable_hash_str
, hashtable_equals_str
, 1),
556 .mutex
= mutex_create(MUTEX_TYPE_DEFAULT
),
557 .cond
= condvar_create(CONDVAR_TYPE_DEFAULT
),
560 this->socket
= vici_socket_create(uri
, inbound
, connect_
, disconnect
, this);
567 return &this->public;