#include <stdexcept>
#include <memory>
-namespace mod_event_zmq {
-
-SWITCH_MODULE_LOAD_FUNCTION(load);
-SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown);
-SWITCH_MODULE_RUNTIME_FUNCTION(runtime);
+#include "mod_event_zmq.h"
-extern "C" {
-SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime);
-};
+namespace mod_event_zmq {
// Handles publishing events out to clients
class ZmqEventPublisher {
public:
- ZmqEventPublisher() :
- context(1),
- event_publisher(context, ZMQ_PUB)
+ ZmqEventPublisher(zmq::context_t &context) :
+ _publisher(context, ZMQ_PUB)
{
- event_publisher.bind("tcp://*:5556");
+ _publisher.bind("tcp://*:5556");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n");
}
zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL);
// Send the message
- event_publisher.send(msg);
+ _publisher.send(msg);
}
private:
free (data);
}
- zmq::context_t context;
- zmq::socket_t event_publisher;
+ zmq::socket_t _publisher;
+};
+
+class char_msg : public zmq::message_t {
+public:
+ char_msg() : zmq::message_t(sizeof(char)) { }
+ char_msg(char data) : zmq::message_t(sizeof(char)) {
+ *char_data() = data;
+ }
+
+ char* char_data() {
+ return static_cast<char*>(this->data());
+ }
};
// Handles global inititalization and teardown of the module
class ZmqModule {
public:
ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) :
- _running(false) {
+ _context(1), _term_rep(_context, ZMQ_REP), _term_req(_context, ZMQ_REQ), _publisher(_context) {
+
+ // Set up the term messaging connection
+ _term_rep.bind(TERM_URI);
+ _term_req.connect(TERM_URI);
+
// Subscribe to all switch events of any subclass
// Store a pointer to ourself in the user data
- if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node)
+ if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, static_cast<void*>(&_publisher), &_node)
!= SWITCH_STATUS_SUCCESS) {
throw std::runtime_error("Couldn't bind to switch events.");
}
}
void Listen() {
- if(_running)
- return;
-
- _publisher.reset(new ZmqEventPublisher());
- _running = true;
+ // All we do is sit here and block the run loop thread so it doesn't return
+ // it seems that if you want to keep your module running you can't return from the run loop
+
+ char_msg msg;
+ while(true) {
+ // Listen for term message
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Entered run loop, waiting for term message\n");
+ _term_rep.recv(&msg);
+ if(*msg.char_data() == MODULE_TERM_REQ_MESSAGE) {
+ // Ack term message
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term message, sending ack and leaving run loop\n");
+
+ *msg.char_data() = MODULE_TERM_ACK_MESSAGE;
+ _term_rep.send(msg);
+
+ break;
+ }
+ }
+ }
- while(_running) {
- switch_yield(100000);
+ void Shutdown() {
+ // Send term message
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown requested, sending term message to runloop\n");
+ char_msg msg(MODULE_TERM_REQ_MESSAGE);
+ _term_req.send(msg);
+
+ while(true) {
+ // Wait for the term ack message
+ _term_req.recv(&msg);
+ if(*msg.char_data() == MODULE_TERM_ACK_MESSAGE) {
+ // Continue shutdown
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term ack message, continuing shutdown\n");
+ break;
+ }
}
}
~ZmqModule() {
// Unsubscribe from the switch events
- _running = false;
switch_event_unbind(&_node);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
}
// Dispatches events to the publisher
static void event_handler(switch_event_t *event) {
try {
- ZmqModule *module = (ZmqModule*)event->bind_user_data;
- if(module->_publisher.get())
- module->_publisher->PublishEvent(event);
+ ZmqEventPublisher *publisher = static_cast<ZmqEventPublisher*>(event->bind_user_data);
+ publisher->PublishEvent(event);
} catch(std::exception ex) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what());
} catch(...) { // Exceptions must not propogate to C caller
}
switch_event_node_t *_node;
- std::auto_ptr<ZmqEventPublisher> _publisher;
- bool _running;
+
+ zmq::context_t _context;
+ zmq::socket_t _term_rep;
+ zmq::socket_t _term_req;
+
+ ZmqEventPublisher _publisher;
};
//*****************************//
SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) {
try {
+ // Tell the module to shutdown
+ module->Shutdown();
+
// Free the module object
module.reset();
+ } catch(std::exception &ex) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module: %s\n", ex.what());
} catch(...) { // Exceptions must not propogate to C caller
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module\n");
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down module\n");
}
}