]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
Fix FS-3904 use a message to shutdown the module
authorJosh Perry <josh@6bit.com>
Thu, 23 Feb 2012 01:19:07 +0000 (18:19 -0700)
committerJeff Lenk <jeff@jefflenk.com>
Thu, 23 Feb 2012 20:40:59 +0000 (14:40 -0600)
For some reason the running flag being set on the thread initiating the
shutdown did not propogate to the run loop thread. I couldn't find any
memory barrier functions in APR or the FS API so I am using 0MQ to send
a termination request message to the run loop thread.

src/mod/event_handlers/mod_event_zmq/mod_event_zmq.cpp
src/mod/event_handlers/mod_event_zmq/mod_event_zmq.h [new file with mode: 0644]

index c8975ebe5db399b34988e77431b23745b6bb7675..b85acbf0e0d848fe0e9f6967ce3051da303b1827 100644 (file)
@@ -4,24 +4,17 @@
 #include <stdexcept>
 #include <memory>
 
-namespace mod_event_zmq {
-
-SWITCH_MODULE_LOAD_FUNCTION(load);
-SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown);
-SWITCH_MODULE_RUNTIME_FUNCTION(runtime);
+#include "mod_event_zmq.h"
 
-extern "C" {
-SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime);
-};
+namespace mod_event_zmq {
 
 // Handles publishing events out to clients
 class ZmqEventPublisher {
 public:
-       ZmqEventPublisher() :
-               context(1),
-               event_publisher(context, ZMQ_PUB)
+       ZmqEventPublisher(zmq::context_t &context) :
+               _publisher(context, ZMQ_PUB)
        {
-               event_publisher.bind("tcp://*:5556");
+               _publisher.bind("tcp://*:5556");
 
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Listening for clients\n");
        }
@@ -35,7 +28,7 @@ public:
                zmq::message_t msg(pjson, strlen(pjson), free_message_data, NULL);
                
                // Send the message
-               event_publisher.send(msg);
+               _publisher.send(msg);
        }
 
 private:
@@ -43,18 +36,34 @@ private:
                free (data);
        }
 
-       zmq::context_t context;
-       zmq::socket_t event_publisher;
+       zmq::socket_t _publisher;
+};
+
+class char_msg : public zmq::message_t {
+public:
+       char_msg() : zmq::message_t(sizeof(char)) { }
+       char_msg(char data) : zmq::message_t(sizeof(char)) {
+               *char_data() = data;
+       }
+
+       char* char_data() {
+               return static_cast<char*>(this->data());
+       }
 };
 
 // Handles global inititalization and teardown of the module
 class ZmqModule {
 public:
        ZmqModule(switch_loadable_module_interface_t **module_interface, switch_memory_pool_t *pool) :
-               _running(false) {
+               _context(1), _term_rep(_context, ZMQ_REP), _term_req(_context, ZMQ_REQ), _publisher(_context) {
+
+               // Set up the term messaging connection
+               _term_rep.bind(TERM_URI);
+               _term_req.connect(TERM_URI);
+
                // Subscribe to all switch events of any subclass
                // Store a pointer to ourself in the user data
-               if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, (void*)this, &_node)
+               if (switch_event_bind_removable(modname, SWITCH_EVENT_ALL, SWITCH_EVENT_SUBCLASS_ANY, event_handler, static_cast<void*>(&_publisher), &_node)
                                != SWITCH_STATUS_SUCCESS) {
                        throw std::runtime_error("Couldn't bind to switch events.");
                }
@@ -67,20 +76,45 @@ public:
        }
 
        void Listen() {
-               if(_running)
-                       return;
-
-               _publisher.reset(new ZmqEventPublisher());
-               _running = true;
+               // All we do is sit here and block the run loop thread so it doesn't return
+               // it seems that if you want to keep your module running you can't return from the run loop
+               
+               char_msg msg;
+               while(true) {
+                       // Listen for term message
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Entered run loop, waiting for term message\n");
+                       _term_rep.recv(&msg);
+                       if(*msg.char_data() == MODULE_TERM_REQ_MESSAGE) {
+                               // Ack term message
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term message, sending ack and leaving run loop\n");
+
+                               *msg.char_data() = MODULE_TERM_ACK_MESSAGE;
+                               _term_rep.send(msg);
+
+                               break;
+                       }
+               }
+       }
 
-               while(_running) {
-                       switch_yield(100000);
+       void Shutdown() {
+               // Send term message
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Shutdown requested, sending term message to runloop\n");
+               char_msg msg(MODULE_TERM_REQ_MESSAGE);
+               _term_req.send(msg);
+
+               while(true) {
+                       // Wait for the term ack message
+                       _term_req.recv(&msg);
+                       if(*msg.char_data() == MODULE_TERM_ACK_MESSAGE) {
+                               // Continue shutdown
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Got term ack message, continuing shutdown\n");
+                               break;
+                       }
                }
        }
 
        ~ZmqModule() {
                // Unsubscribe from the switch events
-               _running = false;
                switch_event_unbind(&_node);
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Module shut down\n");
        }
@@ -89,9 +123,8 @@ private:
        // Dispatches events to the publisher
        static void event_handler(switch_event_t *event) {
                try {
-                       ZmqModule *module = (ZmqModule*)event->bind_user_data;
-                       if(module->_publisher.get())
-                               module->_publisher->PublishEvent(event);
+                       ZmqEventPublisher *publisher = static_cast<ZmqEventPublisher*>(event->bind_user_data);
+                       publisher->PublishEvent(event);
                } catch(std::exception ex) {
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Error publishing event via 0MQ: %s\n", ex.what());
                } catch(...) { // Exceptions must not propogate to C caller
@@ -100,8 +133,12 @@ private:
        }
 
        switch_event_node_t *_node;
-       std::auto_ptr<ZmqEventPublisher> _publisher;
-       bool _running;
+
+       zmq::context_t _context;
+       zmq::socket_t _term_rep;
+       zmq::socket_t _term_req;
+
+       ZmqEventPublisher _publisher;
 };
 
 //*****************************//
@@ -140,10 +177,15 @@ SWITCH_MODULE_RUNTIME_FUNCTION(runtime) {
 
 SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown) {
        try {
+               // Tell the module to shutdown
+               module->Shutdown();
+
                // Free the module object
                module.reset();
+       } catch(std::exception &ex) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module: %s\n", ex.what());
        } catch(...) { // Exceptions must not propogate to C caller
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error shutting down module\n");
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown error shutting down module\n");
        }
 }
 
diff --git a/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.h b/src/mod/event_handlers/mod_event_zmq/mod_event_zmq.h
new file mode 100644 (file)
index 0000000..a60c720
--- /dev/null
@@ -0,0 +1,20 @@
+#ifndef MOD_EVENT_ZMQ_H
+#define MOD_EVENT_ZMQ_H
+
+namespace mod_event_zmq {
+static const char MODULE_TERM_REQ_MESSAGE = 1;
+static const char MODULE_TERM_ACK_MESSAGE = 2;
+
+static const char *TERM_URI = "inproc://mod_event_zmq_term";
+
+SWITCH_MODULE_LOAD_FUNCTION(load);
+SWITCH_MODULE_SHUTDOWN_FUNCTION(shutdown);
+SWITCH_MODULE_RUNTIME_FUNCTION(runtime);
+
+extern "C" {
+SWITCH_MODULE_DEFINITION(mod_event_zmq, load, shutdown, runtime);
+};
+
+}
+
+#endif // MOD_EVENT_ZMQ_H