]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
ftmod_misdn: Use a per-span I/O thread to handle B-channel data.
authorStefan Knoblich <stkn@openisdn.net>
Fri, 24 Aug 2012 14:35:55 +0000 (16:35 +0200)
committerStefan Knoblich <stkn@openisdn.net>
Fri, 24 Aug 2012 16:03:31 +0000 (18:03 +0200)
Move the B-channel message handling into a per-span I/O thread,
to solve most of the problems caused by the intermixed data + control
socket interface of mISDN, missing write poll() support on
mISDN B-channels and the FreeTDM I/O model. This eliminates most of
the audio problems (except for a few minor glitches).

A unix stream socket pair is used as a bi-directional pipe replacement
(the pipe code is still included in this commit, but will be removed later),
with the RX and TX buffer sizes carefully tuned to avoid excessive buffering
(= latency) and a deadlock situation between the write() call in ftdm_write()
and the code in misdn_span_run() that needs a minimum amount of data in the
TX buffer, before sending out a PH_DATA_REQ to the mISDN socket
(see misdn_span_run() comments for more details).

The minimum size for pipes is PAGE_SIZE (4k), which is ~500 ms worth of
audio. A socket pair RX/TX buffer size of 3k, seems to hold a maximum
amount of around 500 bytes data in practice, giving us a much lower
maximum latency than a unix pipe. (The socket pair might be replaced by a
custom ring buffer / fifo data structure to get even more fine grained
control of the maximum latency.)

The newly introduced span_start / span_stop callbacks in
ftdm_io_interface_t are used to start / stop the I/O thread. The callback
functions will wait up to 10 seconds for the thread to successfully
start up or shut down (using a mutex + condition var).

NOTE: Using any of the locking ftdm_span_() functions in the I/O will cause
      a deadlock between the I/O thread (trying to lock span->mutex) and the
      thread calling ftdm_start()/_stop() (holding the span->mutex).
      (The I/O thread currently uses direct span member access to avoid this.)

The I/O thread uses the epoll(7) family of functions for event handling.
An epoll context is created on startup and all B-channel sockets are
registered (READ, PRI and ERR). Before entering the event loop,
the I/O thread will send a signal on the condition variable, to
indicate it has completed the startup procedure.

Incoming b-channel and command pipe events are handled by the event loop.
Payload of incoming PH_DATA_IND frames (= audio data) is sent to the
rx_audio_pipe_in end of the b-channel's socket pair and, if enough data is
available, a PH_DATA_REQ of the same size is sent to the b-channel mISDN socket
to transmit audio.

A MISDN_CMD_STOP command on the event pipe will wake up the I/O thread and
cause it to shut down. All b-channels will be unregistered from the epoll context
and the epoll fd closed. The I/O thread terminates itself after signalling the
successfull shutdown on the condition variable.

TODOs:
    - Move D-Channel into I/O thread too

    - Custom FIFO/ring buffer for data (even lower latency)

    - Improve epoll() code (per-channel struct w/ callback, for epfd.data.ptr)

    - Use mISDN DSP for audio (e.g. tone generator, dtmf detector, echo cancel)

    - Use a per-port / span control socket to execute channel commands
      synchronously, or add misdn_commands (queue?) that can be used that way

    - Name I/O threads 'mISDN-%SPAN_NAME%', e.g. 'mISDN-M_BRI1'
      (= add ftdm_thread_set_namef(thread, fmt, ...) / ftdm_thread_set_name(thread, name))

TL;DR: "tweak", solves "booboo" with audio

Signed-off-by: Stefan Knoblich <stkn@openisdn.net>
libs/freetdm/src/ftmod/ftmod_misdn/ftmod_misdn.c

index 462cb9a6c4eb14f3950cd7e56369b867d9d982b9..84133601f3b13b928409dc1168d6dd653d683441 100644 (file)
@@ -44,6 +44,7 @@
 #include <fcntl.h>
 #include <poll.h>
 #include <pthread.h>
+#include <sys/epoll.h>
 
 /* this is how it should have been...
 #ifdef HAVE_FREETDM_FREETDM_H
@@ -70,6 +71,7 @@
 //#define MISDN_DEBUG_EVENTS
 //#define MISDN_DEBUG_IO
 
+#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
 
 typedef enum {
        MISDN_CAPS_NONE = 0,
@@ -145,7 +147,7 @@ const static struct {
 #undef MISDN_CONTROL_TYPE
 };
 
-#if 0 /* unused for now */
+#ifdef MISDN_DEBUG_EVENTS
 static const char *misdn_control2str(const int ctrl)
 {
        int x;
@@ -219,6 +221,16 @@ static inline void misdn_convert_audio_bits(char *buf, int buflen)
  * mISDN <-> FreeTDM data structures
  ***********************************************************************************/
 
+typedef enum {
+       MISDN_CMD_NONE = 0,
+       MISDN_CMD_STOP,         /*!< Stop the I/O thread */
+} misdn_cmd_t;
+
+struct misdn_command {
+       misdn_cmd_t type;
+/*     union { } cmd; */       /*!< Command-specific parameters */
+};
+
 enum {
        MISDN_SPAN_NONE    = 0,
        MISDN_SPAN_RUNNING = (1 << 0),
@@ -227,10 +239,18 @@ enum {
 
 struct misdn_span_private {
        int flags;
+       int running;
+
+       int event_pipe_in;
+       int event_pipe_out;
 
        /* event conditional */
        pthread_mutex_t event_cond_mutex;
        pthread_cond_t  event_cond;
+
+       /* start / stop feedback */
+       pthread_mutex_t ctrl_cond_mutex;
+       pthread_cond_t  ctrl_cond;
 };
 
 struct misdn_event_queue;
@@ -243,9 +263,11 @@ struct misdn_chan_private {
        /* hw addr of channel */
        struct sockaddr_mISDN addr;
 
-       /* audio tx pipe */
-       int audio_pipe_in;
-       int audio_pipe_out;
+       /* audio tx pipe (= socketpair ends) */
+       int tx_audio_pipe_in;
+       int tx_audio_pipe_out;
+       int rx_audio_pipe_in;
+       int rx_audio_pipe_out;
 
        /* counters */
        unsigned long tx_cnt;
@@ -254,6 +276,14 @@ struct misdn_chan_private {
        unsigned long slip_rx_cnt;
        unsigned long slip_tx_cnt;
 
+       unsigned long tx_pipe_wr_bytes;         /*!< Number of bytes written into tx audio pipe */
+       unsigned long tx_pipe_rd_bytes;         /*!< Number of bytes read from tx audio pipe */
+       unsigned long tx_miss_bytes;            /*!< Number of bytes missing in short reads from tx audio pipe */
+       unsigned long tx_lost_bytes;            /*!< Number of bytes lost in short writes to the mISDN B-Channel */
+       unsigned long tx_sent_bytes;            /*!< Number of bytes successfully sent to the mISDN B-Channel */
+       unsigned long tx_pipe_under_cnt;        /*!< Number of tx audio pipe underflows */
+       unsigned long tx_pipe_over_cnt;         /*!< Number of tx audio pipe overflows  */
+
        struct misdn_event_queue *events;
 };
 
@@ -557,11 +587,11 @@ static ftdm_status_t _misdn_toggle_channel(ftdm_channel_t *chan, int activate)
                                        (activate) ? "activation" : "deactivation", strerror(errno));
                                return FTDM_FAIL;
                        }
-//#ifdef MISDN_DEBUG_EVENTS
+#ifdef MISDN_DEBUG_EVENTS
                        ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s (%#x)', id %#x, while waiting for %s confirmation on %c-channel\n",
                                misdn_event2str(hh->prim), hh->prim, hh->id, (activate) ? "activation" : "deactivation",
                                ftdm_channel_get_type(chan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
-//#endif
+#endif
                        switch (hh->prim) {
                        case PH_ACTIVATE_IND:
                        case PH_ACTIVATE_CNF:
@@ -685,10 +715,10 @@ static ftdm_status_t misdn_get_ph_info(ftdm_channel_t *chan, struct ph_info *inf
                                        misdn_event2str(req), strerror(errno));
                                return FTDM_FAIL;
                        }
-//#ifdef MISDN_DEBUG_EVENTS
+#ifdef MISDN_DEBUG_EVENTS
                        ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s' while waiting for %s answer\n",
                                misdn_event2str(hh->prim), misdn_event2str(req));
-//#endif
+#endif
                        switch (hh->prim) {
                        case MPH_INFORMATION_IND:       /* success */
                                if (retval < MISDN_HEADER_LEN + sizeof(*info)) {
@@ -936,7 +966,7 @@ static int misdn_handle_mph_information_ind(ftdm_channel_t *chan, const struct m
  * mISDN <-> FreeTDM interface functions
  ***********************************************************************************/
 
-struct misdn_globals {
+static struct misdn_globals {
        int sockfd;
 } globals;
 
@@ -1022,6 +1052,11 @@ static FIO_CLOSE_FUNCTION(misdn_close)
                                ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
                }
 
+               ftdm_log_chan(ftdmchan, FTDM_LOG_NOTICE, "mISDN tx stats: wr: %lu, rd: %lu, tx: %lu, tx-lost: %lu, tx-miss: %lu, tx-under#: %lu, tx-over#: %lu\n",
+                       chan_priv->tx_pipe_wr_bytes, chan_priv->tx_pipe_rd_bytes,
+                       chan_priv->tx_sent_bytes, chan_priv->tx_lost_bytes, chan_priv->tx_miss_bytes,
+                       chan_priv->tx_pipe_over_cnt, chan_priv->tx_pipe_under_cnt);
+
                chan_priv->active = 0;
        }
 
@@ -1109,16 +1144,22 @@ static FIO_WAIT_FUNCTION(misdn_wait)
        switch (ftdm_channel_get_type(ftdmchan)) {
        case FTDM_CHAN_TYPE_B:
                if (*flags & FTDM_WRITE) {
-                       pfds[nr_fds].fd = chan_priv->audio_pipe_in;
+                       pfds[nr_fds].fd = chan_priv->tx_audio_pipe_in;
                        pfds[nr_fds].events = POLLOUT;
                        nr_fds++;
                }
-               if (*flags & (FTDM_READ | FTDM_EVENTS)) {
+               if (*flags & FTDM_READ) {
+                       pfds[nr_fds].fd = chan_priv->rx_audio_pipe_out;
+                       pfds[nr_fds].events = POLLIN;
+                       nr_fds++;
+               }
+/*             if (*flags & (FTDM_READ | FTDM_EVENTS)) {
                        pfds[nr_fds].fd = ftdmchan->sockfd;
                        pfds[nr_fds].events |= (*flags & FTDM_READ)   ? POLLIN  : 0;
                        pfds[nr_fds].events |= (*flags & FTDM_EVENTS) ? POLLPRI : 0;
                        nr_fds++;
                }
+*/
                break;
        default:
                if (*flags & FTDM_READ)
@@ -1149,7 +1190,7 @@ static FIO_WAIT_FUNCTION(misdn_wait)
 
        switch (ftdm_channel_get_type(ftdmchan)) {
        case FTDM_CHAN_TYPE_B:
-               if (pfds[0].revents & POLLOUT)
+               if ((pfds[0].revents & POLLOUT) || (pfds[1].revents & POLLOUT))
                        *flags |= FTDM_WRITE;
                if ((pfds[0].revents & POLLIN)  || (pfds[1].revents & POLLIN))
                        *flags |= FTDM_READ;
@@ -1171,10 +1212,10 @@ static FIO_WAIT_FUNCTION(misdn_wait)
 
 /**
  * Handle incoming mISDN message on d-channel
- * @param[in]  ftdmchan
- * @param[in]  msg_buf
- * @param[in]  msg_len
- * @internal
+ * \param[in]  ftdmchan
+ * \param[in]  msg_buf
+ * \param[in]  msg_len
+ * \internal
  */
 static ftdm_status_t misdn_handle_incoming(ftdm_channel_t *ftdmchan, const char *msg_buf, const int msg_len)
 {
@@ -1258,13 +1299,6 @@ static FIO_READ_FUNCTION(misdn_read)
        int retval;
        int maxretry = 10;
 
-       if (!priv->active) {
-               ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed channel\n");
-               /* ignore */
-               *datalen = 0;
-               return FTDM_SUCCESS;
-       }
-
        /* nothing read yet */
        *datalen = 0;
 
@@ -1273,111 +1307,61 @@ static FIO_READ_FUNCTION(misdn_read)
         * we'll get a lot of "mISDN_send: error -12" message in dmesg otherwise
         * (= b-channel receive queue overflowing)
         */
-       while (maxretry--) {
-               struct sockaddr_mISDN addr;
-               socklen_t addrlen = sizeof(addr);
-
-               if ((retval = recvfrom(ftdmchan->sockfd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&addr, &addrlen)) < 0) {
-                       if (errno == EWOULDBLOCK || errno == EAGAIN) break;
-                       ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to receive incoming message: %s\n",
-                               strerror(errno));
-                       return FTDM_FAIL;
-               }
-
-               if (retval < MISDN_HEADER_LEN) {
-                       ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "mISDN received message too small\n");
-                       return FTDM_FAIL;
-               }
-
-               if (hh->prim == PH_DATA_IND) {
-                       *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes);
-#ifdef MISDN_DEBUG_IO
-                       ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n",
-                               misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel);
-
-                       if (*datalen > 0) {
-                               char hbuf[MAX_DATA_MEM] = { 0 };
-                               print_hex_bytes(data, *datalen, hbuf, sizeof(hbuf));
-                               ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN read data: %s\n", hbuf);
+       switch (ftdm_channel_get_type(ftdmchan)) {
+       case FTDM_CHAN_TYPE_DQ921: {
+               while (maxretry--) {
+                       struct sockaddr_mISDN addr;
+                       socklen_t addrlen = sizeof(addr);
+
+                       if ((retval = recvfrom(ftdmchan->sockfd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&addr, &addrlen)) < 0) {
+                               if (errno == EWOULDBLOCK || errno == EAGAIN) break;
+                               ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to receive incoming message: %s\n",
+                                       strerror(errno));
+                               return FTDM_FAIL;
                        }
-#endif
-                       if (*datalen <= 0)
-                               continue;
 
-                       /*
-                        * Copy data into ouput buffer (excluding the mISDN message header)
-                        * NOTE: audio data needs to be converted to a-law / u-law!
-                        */
-                       memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen);
+                       if (retval < MISDN_HEADER_LEN) {
+                               ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "mISDN received message too small\n");
+                               return FTDM_FAIL;
+                       }
 
-                       switch (ftdm_channel_get_type(ftdmchan)) {
-                       case FTDM_CHAN_TYPE_B:
-                               hh->prim = PH_DATA_REQ;
-                               hh->id   = MISDN_ID_ANY;
-                               bytes    = *datalen;
+                       if (hh->prim == PH_DATA_IND) {
+                               *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes);
 
-                               /* Convert incoming audio data to *-law */
-                               misdn_convert_audio_bits(data, *datalen);
+                               if (*datalen <= 0)
+                                       continue;
 
                                /*
-                                * Fetch required amount of audio from tx pipe, using the amount
-                                * of received bytes as an indicator for how much free space the
-                                * b-channel tx buffer has available.
-                                *
-                                * (see misdn_write() for the part that fills the tx pipe)
-                                *
-                                * NOTE: can't use blocking I/O here since both parts are serviced
-                                *       from the same thread
+                                * Copy data into ouput buffer (excluding the mISDN message header)
+                                * NOTE: audio data needs to be converted to a-law / u-law!
                                 */
-                               if ((retval = read(priv->audio_pipe_out, rbuf + MISDN_HEADER_LEN, bytes)) < 0) {
-                                       if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
-                                               ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read %d bytes of audio data: %s\n",
-                                                       bytes, strerror(errno));
-                                               break;
-                                       }
-                                       /* Tx pipe is empty, completely fill buffer up to "bytes" with silence value */
-                                       retval = 0;
-                               }
-
-                               /*
-                                * Use a-law / u-law silence to fill missing bytes,
-                                * in case there was not enough audio data available in the
-                                * tx pipe to satisfy the request.
-                                */
-                               if (retval < bytes) {
-                                       memset(&rbuf[MISDN_HEADER_LEN + retval],
-                                               (ftdm_channel_get_codec(ftdmchan) == FTDM_CODEC_ALAW) ? 0x2a : 0xff,
-                                               bytes - retval);
-                               }
-
-                               /* Convert outgoing audio data to wire format */
-                               misdn_convert_audio_bits(rbuf + MISDN_HEADER_LEN, bytes);
-                               bytes += MISDN_HEADER_LEN;
-
-                               /* Send converted audio to b-channel */
-                               if ((retval = sendto(ftdmchan->sockfd, rbuf, bytes, 0, (struct sockaddr *)&priv->addr, sizeof(priv->addr))) < bytes) {
-                                       ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to send %d bytes of audio data: (%d) %s\n",
-                                               bytes, retval, strerror(errno));
-                               }
-                               break;
-                       default:
-                               break;
+                               memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen);
+                               return FTDM_SUCCESS;
+                       } else {
+                               *datalen = 0;
+                               /* event */
+                               misdn_handle_incoming(ftdmchan, rbuf, retval);
                        }
+               }
+               break;
+       }
+       case FTDM_CHAN_TYPE_B: {
+               if (!priv->active) {
+                       ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed b-channel\n");
                        return FTDM_SUCCESS;
-               } else {
-                       *datalen = 0;
-#ifdef MISDN_DEBUG_IO
-                       ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n",
-                               misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel);
-#endif
-                       /* event */
-                       misdn_handle_incoming(ftdmchan, rbuf, retval);
                }
+
+               if ((retval = read(priv->rx_audio_pipe_out, data, bytes)) < 0) {
+                       ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read audio data from rx pipe: %s\n",
+                               strerror(errno));
+                       return FTDM_FAIL;
+               }
+               *datalen = retval;
+               break;
+       }
+       default:
+               break;
        }
-#ifdef MISDN_DEBUG_IO
-       ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN nothing received on %c-channel\n",
-               ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
-#endif
        return FTDM_SUCCESS;
 }
 
@@ -1427,11 +1411,18 @@ static FIO_WRITE_FUNCTION(misdn_write)
                 * NOTE: can't use blocking I/O here since both parts are serviced
                 *       from the same thread
                 */
-               if ((retval = write(priv->audio_pipe_in, data, size)) < size) {
-                       ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error: %s\n",
-                               strerror(errno));
+               if ((retval = write(priv->tx_audio_pipe_in, data, size)) < 0) {
+                       ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu]: %s\n",
+                               priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes,
+                               priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, strerror(errno));
                        return FTDM_FAIL;
+               } else if (retval < size) {
+                       priv->tx_pipe_over_cnt++;
+                       ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "mISDN channel audio pipe short write [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu], expected: %d, written: %d\n",
+                               priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes,
+                               priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, size, retval);
                }
+               ACCESS_ONCE(priv->tx_pipe_wr_bytes) += retval;
                *datalen = retval;
                break;
        default:
@@ -1482,6 +1473,12 @@ static FIO_WRITE_FUNCTION(misdn_write)
        return FTDM_SUCCESS;
 }
 
+/**
+ * Carefully choosen size for socket send/recv buffers
+ * larger values will add more latency, while lower values will cause deadlocks
+ * (see misdn_span_run() comments below for an explanation)
+ */
+#define SOCKETPAIR_BUFFER_SIZE 3072
 
 static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, struct mISDN_devinfo *devinfo, int start, int end)
 {
@@ -1556,9 +1553,11 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type,
                ftdm_log(FTDM_LOG_DEBUG, "mISDN opened socket (on chan:dev => %d:%d): %d\n",
                        addr.dev, addr.channel, sockfd);
 
-               /* set non-blocking */
+               /* Set mISDN channel socket non-blocking */
                if (fcntl(sockfd, F_SETFL, O_NONBLOCK) < 0) {
-                       ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n", strerror(errno));
+                       ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n",
+                               strerror(errno));
+                       close(sockfd);
                        return FTDM_FAIL;
                }
 
@@ -1604,25 +1603,109 @@ static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type,
                if (ftdmchan->type == FTDM_CHAN_TYPE_B) {
                        int pipefd[2] = { -1, -1 };
 
-                       ftdmchan->packet_len         = 10 /* ms */ * (ftdmchan->rate / 1000);
+                       ftdmchan->packet_len         = 30 /* ms */ * (ftdmchan->rate / 1000);
                        ftdmchan->effective_interval = ftdmchan->native_interval = ftdmchan->packet_len / 8;
                        ftdmchan->native_codec       = ftdmchan->effective_codec = FTDM_CODEC_ALAW;
 
-                       ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL);
+//                     ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL);
 
+#ifdef USE_PIPE
                        /*
                         * Create audio tx pipe, use non-blocking I/O to avoid deadlock since both ends
                         * are used from the same thread
                         */
-                       if (pipe2(pipefd, O_NONBLOCK) < 0) {
-                               ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio write pipe [%d:%d]: %s\n",
+                       if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) {
+                               ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio tx pipe [%d:%d]: %s\n",
+                                       addr.dev, x, strerror(errno));
+                               close(sockfd);
+                               return FTDM_FAIL;
+                       }
+                       priv->tx_audio_pipe_in  = pipefd[1];
+                       priv->tx_audio_pipe_out = pipefd[0];
+
+#if 1 || defined(HAVE_F_SETPIPE_SZ)
+                       if (fcntl(priv->tx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) {
+                               ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio tx pipe size [%d:%d]: %s\n",
+                                       addr.dev, x, strerror(errno));
+                       }
+#endif
+                       /*
+                        * Create audio rx pipe, use non-blocking I/O to avoid deadlock since both ends
+                        * are used from the same thread
+                        */
+                       if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) {
+                               ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio rx pipe [%d:%d]: %s\n",
                                        addr.dev, x, strerror(errno));
                                close(sockfd);
                                return FTDM_FAIL;
                        }
-                       priv->audio_pipe_in  = pipefd[1];
-                       priv->audio_pipe_out = pipefd[0];
+                       priv->rx_audio_pipe_in  = pipefd[1];
+                       priv->rx_audio_pipe_out = pipefd[0];
+
+#if 1 || defined(HAVE_F_SETPIPE_SZ)
+                       if (fcntl(priv->rx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) {
+                               ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio rx pipe size [%d:%d]: %s\n",
+                                       addr.dev, x, strerror(errno));
+                       }
+#endif
+#else  /* !USE_PIPE */
+                       /*
+                        * Use a socket pair for audio rx/tx, allows for more fine-grained control
+                        * of latency (= amounts of data in buffers)
+                        */
+                       if (socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd) < 0) {
+                               ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio socket pair [%d:%d]: %s\n",
+                                       addr.dev, x, strerror(errno));
+                               close(sockfd);
+                               return FTDM_FAIL;
+                       } else {
+                               int opt = SOCKETPAIR_BUFFER_SIZE;
+                               socklen_t optlen = sizeof(opt);
+
+                               if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) {
+                                       ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[0] to non-blocking: %s\n",
+                                               strerror(errno));
+                                       close(sockfd);
+                                       close(pipefd[0]);
+                                       close(pipefd[1]);
+                                       return FTDM_FAIL;
+                               }
+                               if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) {
+                                       ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[1] to non-blocking: %s\n",
+                                       strerror(errno));
+                                       close(sockfd);
+                                       close(pipefd[0]);
+                                       close(pipefd[1]);
+                                       return FTDM_FAIL;
+                               }
 
+                               /*
+                                * Set RX/TX buffer sizes on each end of the socket pair
+                                */
+                               if (setsockopt(pipefd[0], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) {
+                                       ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] RCVBUF: %s\n",
+                                               strerror(errno));
+                               }
+                               if (setsockopt(pipefd[0], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) {
+                                       ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] SNDBUF: %s\n",
+                                               strerror(errno));
+                               }
+                               if (setsockopt(pipefd[1], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) {
+                                       ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] RCVBUF: %s\n",
+                                               strerror(errno));
+                               }
+                               if (setsockopt(pipefd[1], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) {
+                                       ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] SNDBUF: %s\n",
+                                               strerror(errno));
+                               }
+
+                               priv->rx_audio_pipe_in  = pipefd[1];
+                               priv->rx_audio_pipe_out = pipefd[0];
+
+                               priv->tx_audio_pipe_in  = pipefd[0];
+                               priv->tx_audio_pipe_out = pipefd[1];
+                       }
+#endif
                } else {
                        /* early activate D-Channel */
                        misdn_activate_channel(ftdmchan);
@@ -1738,6 +1821,8 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span)
 
        /* allocate span private */
        if (!span_priv) {
+               int pipe[2] = { -1, -1 };
+
                /*
                 * Not perfect, there should be something like span_create too
                 */
@@ -1751,6 +1836,19 @@ static FIO_CONFIGURE_SPAN_FUNCTION(misdn_configure_span)
                /* init event condition */
                pthread_cond_init(&span_priv->event_cond, NULL);
                pthread_mutex_init(&span_priv->event_cond_mutex, NULL);
+
+               /* init control condition */
+               pthread_cond_init(&span_priv->ctrl_cond, NULL);
+               pthread_mutex_init(&span_priv->ctrl_cond_mutex, NULL);
+
+               /* create event pipe */
+               if (pipe2(pipe, O_CLOEXEC) < 0) {
+                       ftdm_log(FTDM_LOG_ERROR, "mISDN failed to create event pipe: %s\n",
+                               strerror(errno));
+                       return FTDM_FAIL;
+               }
+               span_priv->event_pipe_in  = pipe[0];
+               span_priv->event_pipe_out = pipe[1];
        }
 
        /* split channel list by ',' */
@@ -1819,8 +1917,6 @@ static FIO_GET_ALARMS_FUNCTION(misdn_get_alarms)
        add event queues and data fifos, so we can sift all the
        messages we get to forward them to the right receiver
 */
-       ftdm_span_t *span = ftdm_channel_get_span(ftdmchan);
-       struct misdn_span_private *span_priv = ftdm_span_io_private(span);
        char buf[MAX_DATA_MEM] = { 0 };
        struct sockaddr_mISDN addr;
        struct mISDNhead *hh;
@@ -1910,31 +2006,50 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event)
        int retval = 0, nr_events = 0;
        int i;
 
-       clock_gettime(CLOCK_REALTIME, &ts);
-       ts_add_msec(&ts, ms);
-
        for (i = 1; i <= ftdm_span_get_chan_count(span); i++) {
                ftdm_channel_t *chan = ftdm_span_get_channel(span, i);
                struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
 
+               /* Skip channels that have event processing pending (Avoids event storms) */
+               if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT))
+                       continue;
+
                if (misdn_event_queue_has_data(chan_priv->events)) {
 #ifdef MISDN_DEBUG_EVENTS
                        ftdm_log(FTDM_LOG_DEBUG, "mISDN channel %d:%d has event(s)\n",
                                ftdm_channel_get_span_id(chan), ftdm_channel_get_id(chan));
 #endif
-                       ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT);
+                       ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
                        chan->last_event_time = ftdm_current_time_in_ms();
                        nr_events++;
                }
        }
-       if (nr_events)
+       if (nr_events) {
+#ifdef MISDN_DEBUG_EVENTS
+               ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has %d new events pending (pre poll)\n",
+                       ftdm_span_get_id(span), nr_events);
+#endif
                return FTDM_SUCCESS;
+       }
+
+
+#ifdef MISDN_DEBUG_EVENTS
+       ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has no events pending, polling for new events with %d ms timeout\n",
+               ftdm_span_get_id(span), ms);
+#endif
+       /* Wait at least 1 ms, max 1 s */
+       ms = ftdm_clamp(ms, 1, 1000);
+
+       clock_gettime(CLOCK_REALTIME, &ts);
+       ts_add_msec(&ts, ms);
 
        if ((retval = pthread_cond_timedwait(&span_priv->event_cond, &span_priv->event_cond_mutex, &ts))) {
                switch (retval) {
                case ETIMEDOUT:
-//                     ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n",
-//                             ftdm_span_get_id(span), ms);
+#ifdef MISDN_DEBUG_EVENTS
+                       ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n",
+                               ftdm_span_get_id(span), ms);
+#endif
                        return FTDM_TIMEOUT;
                default:
                        ftdm_log(FTDM_LOG_DEBUG, "mISDN failed to poll for events on span %d: %s\n",
@@ -1943,12 +2058,20 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event)
                }
        }
 
+#ifdef MISDN_DEBUG_EVENTS
+       ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d received new event notification, checking channel event queues\n",
+               ftdm_span_get_id(span));
+#endif
        for (i = 1; i <= ftdm_span_get_chan_count(span); i++) {
                ftdm_channel_t *chan = ftdm_span_get_channel(span, i);
                struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
 
+               /* Skip channels that have event processing pending (Avoids event storms) */
+               if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT))
+                       continue;
+
                if (misdn_event_queue_has_data(chan_priv->events)) {
-                       ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT);
+                       ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
                        chan->last_event_time = ftdm_current_time_in_ms();
                        nr_events++;
                }
@@ -1956,6 +2079,57 @@ static FIO_SPAN_POLL_EVENT_FUNCTION(misdn_poll_event)
        return (nr_events) ? FTDM_SUCCESS : FTDM_TIMEOUT;       /* no events? => timeout */
 }
 
+/**
+ * Retrieve event from channel
+ * \param      ftdmchan        Channel to retrieve event from
+ * \param      event           FreeTDM event to return
+ * \return Success or failure
+ */
+static FIO_CHANNEL_NEXT_EVENT_FUNCTION(misdn_channel_next_event)
+{
+       struct misdn_chan_private *chan_priv = ftdm_chan_io_private(ftdmchan);
+       struct misdn_event *evt = NULL;
+       ftdm_span_t *span = ftdm_channel_get_span(ftdmchan);
+       uint32_t event_id = FTDM_OOB_INVALID;
+
+       ftdm_assert(span, "span == NULL");
+
+       ftdm_clear_io_flag(ftdmchan, FTDM_CHANNEL_IO_EVENT);
+
+       if (!(evt = misdn_event_queue_pop(chan_priv->events))) {
+#ifdef MISDN_DEBUG_EVENTS
+               ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n");
+#endif
+               return FTDM_FAIL;
+       }
+
+#ifdef MISDN_DEBUG_EVENTS
+       ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "Got event '%s' from channel event queue\n",
+               misdn_event2str(evt->id));
+#endif
+       /* Convert from misdn event to ftdm */
+       switch (evt->id) {
+       case PH_DEACTIVATE_IND:
+               event_id = FTDM_OOB_ALARM_TRAP;
+               ftdmchan->alarm_flags |= FTDM_ALARM_RED;
+               break;
+       case PH_ACTIVATE_IND:
+               event_id = FTDM_OOB_ALARM_CLEAR;
+               ftdmchan->alarm_flags &= ~FTDM_ALARM_RED;
+               break;
+       default:
+               ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "Unhandled event id %d (0x%x) %s\n",
+                       evt->id, evt->id, misdn_event2str(evt->id));
+       }
+
+       ftdmchan->last_event_time = 0;
+       span->event_header.e_type = FTDM_EVENT_OOB;
+       span->event_header.enum_id = event_id;
+       span->event_header.channel = ftdmchan;
+       *event = &span->event_header;
+       return FTDM_SUCCESS;
+}
+
 /**
  * \brief      Retrieve event
  * \param      span    FreeTDM span
@@ -1974,11 +2148,12 @@ static FIO_SPAN_NEXT_EVENT_FUNCTION(misdn_next_event)
                struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
                struct misdn_event *evt = NULL;
 
+               ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
+
                if (!(evt = misdn_event_queue_pop(chan_priv->events))) {
 #ifdef MISDN_DEBUG_EVENTS
                        ftdm_log_chan_msg(chan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n");
 #endif
-                        ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
                        continue;
                }
 
@@ -2051,6 +2226,7 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy)
 {
        struct misdn_span_private *span_priv = ftdm_span_io_private(span);
 
+       /* free resources */
        ftdm_span_io_private(span) = NULL;
        ftdm_safe_free(span_priv);
 
@@ -2060,10 +2236,397 @@ static FIO_SPAN_DESTROY_FUNCTION(misdn_span_destroy)
 }
 
 
+/**
+ * Called by misdn_span_run() to handle incoming b-channel events
+ * \param[in]  chan    FreeTDM channel object
+ * \return     FTDM_SUCCESS on success, FTDM_* on error
+ */
+static ftdm_status_t handle_b_channel_event(ftdm_channel_t *chan)
+{
+       struct misdn_chan_private *priv = ftdm_chan_io_private(chan);
+       char buf[MAX_DATA_MEM] = { 0 };
+       struct mISDNhead *mh = (void *)buf;
+       int retval;
+
+       if ((retval = recvfrom(chan->sockfd, buf, sizeof(buf), 0, NULL, NULL)) <= 0) {
+               ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to receive message: %s\n",
+                       strerror(errno));
+               return FTDM_FAIL;
+       }
+
+       if (retval < MISDN_HEADER_LEN) {
+               ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN message too short, min.: %d, read: %d\n",
+                       (int)MISDN_HEADER_LEN, retval);
+               return FTDM_FAIL;
+       }
+
+       switch (mh->prim) {
+       case PH_DATA_IND: {
+               int datalen = retval - MISDN_HEADER_LEN;
+               char *data  = buf    + MISDN_HEADER_LEN;
+
+               /* Convert audio data */
+               misdn_convert_audio_bits(data, datalen);
+
+               /* Write audio into receive pipe */
+               if ((retval = write(priv->rx_audio_pipe_in, data, datalen)) < 0) {
+                       ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into rx pipe: %s\n",
+                               strerror(errno));
+                       return FTDM_FAIL;
+               } else if (retval < datalen) {
+                       ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN short write into rx pipe, written: %d, expected: %d\n",
+                               retval, datalen);
+                       return FTDM_FAIL;
+               }
+
+               /* Get receive buffer usage */
+               if (ioctl(priv->tx_audio_pipe_out, FIONREAD, &retval) < 0) {
+                       ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to get tx audio buffer usage: %s\n",
+                               strerror(errno));
+                       return FTDM_FAIL;
+               } else if (retval < datalen) {
+//                      ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN has not enough bytes in tx audio pipe, available: %d, requested: %d\n",
+//                             retval, datalen);
+                       priv->tx_pipe_under_cnt++;
+                       return FTDM_SUCCESS;
+               }
+
+#ifdef MISDN_DEBUG_IO
+               ftdm_log_chan(chan, FTDM_LOG_INFO, "mISDN tx audio buffer usage: %d\n",
+                       retval);
+#endif
+
+               /* Get audio from tx pipe */
+               if ((retval = read(priv->tx_audio_pipe_out, data, datalen)) < 0) {
+                       ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to read audio data from tx pipe: %s\n",
+                               strerror(errno));
+                       return FTDM_FAIL;
+               } else if (retval == 0) {
+                       ftdm_log_chan_msg(chan, FTDM_LOG_NOTICE, "mISDN tx pipe is empty\n");
+                       priv->tx_pipe_under_cnt++;
+                       return FTDM_SUCCESS;
+               } else if (retval < datalen) {
+                       ftdm_log_chan(chan, FTDM_LOG_NOTICE, "mISDN short read from tx pipe, read: %d, expected: %d\n",
+                               retval, datalen);
+                       priv->tx_pipe_under_cnt++;
+                       priv->tx_miss_bytes += ftdm_max(0, datalen - retval);
+                       datalen = retval;
+               }
+               priv->tx_pipe_rd_bytes += retval;
+
+               if (!priv->active) {
+                       /* discard */
+                       return FTDM_SUCCESS;
+               }
+
+               /* Convert audio data */
+               misdn_convert_audio_bits(data, datalen);
+
+               /* Write to channel */
+               mh->prim = PH_DATA_REQ;
+               mh->id   = 0;
+               datalen += MISDN_HEADER_LEN;
+
+               if ((retval = write(chan->sockfd, buf, datalen)) < 0) {
+                       ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into b-channel: %s\n",
+                               strerror(errno));
+                       return FTDM_FAIL;
+               } else if (retval < datalen) {
+                       ftdm_log_chan(chan, FTDM_LOG_WARNING, "mISDN short write into b-channel, written: %d, expected: %d\n",
+                               retval, datalen);
+                       priv->tx_lost_bytes += ftdm_max(0, datalen - retval - MISDN_HEADER_LEN);
+               }
+               priv->tx_sent_bytes += ftdm_max(0, retval - MISDN_HEADER_LEN);
+               break;
+       }
+       case PH_DATA_CNF:
+               priv->tx_ack_cnt++;
+               break;
+       case PH_DEACTIVATE_IND:
+               priv->active = 0;
+               break;
+       case PH_ACTIVATE_IND:
+               priv->active = 1;
+               break;
+       default:
+               ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN received unknown/unhandled event primitive: (%d) %s\n",
+                       mh->prim, misdn_event2str(mh->prim));
+               break;
+       }
+       return FTDM_SUCCESS;
+}
+
+
+/**
+ * Timeout (miliseconds) for epoll_wait()
+ */
+#define MISDN_EPOLL_WAIT_MAX_MSEC      1000
+
+/**
+ * mISDN I/O thread
+ * This thread handles all of the B-Channel I/O, this avoids all of the hazzles with
+ * intermixed data + control frames on mISDN sockets and the missing write poll support on B-Channels.
+ *
+ * Each channel uses a unix stream socketpair as a two-way, pipe replacement for incoming and outgoing
+ * data. Socketpairs allow a more fine grained tuning of the buffer sizes (pipe are restricted to multiples of
+ * the native page size (with the smallest possible size (4k) being already 500ms worth of audio).
+ *
+ * The socketpair buffer sizes and the send algorithm have been carefully tuned to:
+ *
+ *    - Minimize the risk of sending too much data and making the mISDN drivers unhappy, by
+ *      sending PH_DATA_REQ only when there is as much data available as we have received in
+ *      the PH_DATA_IND.
+ *
+ *    - Avoid deadlocks between ftdm_write() trying to fill an almust full socket buffer and
+ *      the I/O thread not having enough data to send a PH_DATA_REQ message.
+ *      (The write() call will return EAGAIN since there is not ehough space free to send all audio data.)
+ *
+ * \param      thread  FreeTDM thread handle
+ * \param      data    Private data pointer passed to ftdm_thread_create_detached() (the span object)
+ * \return     Always returns NULL (unused)
+ *
+ * \note
+ *     ftdm_span_start/_stop() locks the span mutex,
+ *     use direct access to span members to avoid deadlocking
+ *
+ * \todo
+ *     Move D-Channel handling into the I/O thread too.
+ *     Use custom ring buffer structures instead of socketpairs
+ *     (for even more fine grained size control).
+ */
+static void *misdn_span_run(ftdm_thread_t *thread, void *data)
+{
+       ftdm_span_t *span = data;
+       struct misdn_span_private *priv = ftdm_span_io_private(span);
+       struct epoll_event evh;
+       int epfd = -1;
+       int ret;
+       int i;
+
+       ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread initializing\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       /* Use epoll for event handling */
+       epfd = epoll_create(1);
+       if (epfd < 0) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to create epoll context: %s\n",
+                       ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+               goto error;
+       }
+
+       ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding event pipe to epoll context\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       /* Add event pipe */
+       evh.events   = EPOLLIN | EPOLLPRI | EPOLLERR;
+       evh.data.fd  = priv->event_pipe_out;
+
+       ret = epoll_ctl(epfd, EPOLL_CTL_ADD, priv->event_pipe_out, &evh);
+       if (ret < 0) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add event pipe to epoll context: %s\n",
+                       ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+               goto error;
+       }
+
+       ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channels to epoll context\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       /* Add b-channels */
+       for (i = 1; i <= span->chan_count; i++) {
+               ftdm_channel_t *chan = span->channels[i];
+               ftdm_assert(chan, "channel == NULL");
+
+               if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B)
+                       continue;
+
+               ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channel [%d:%d] to epoll context\n",
+                       ftdm_span_get_id(span), ftdm_span_get_name(span),
+                       ftdm_channel_get_id(chan), ftdm_channel_get_ph_id(chan));
+
+               evh.events   = EPOLLIN | EPOLLPRI | EPOLLERR;
+               evh.data.ptr = chan;
+
+               ret = epoll_ctl(epfd, EPOLL_CTL_ADD, chan->sockfd, &evh);
+               if (ret < 0) {
+                       ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add b-channel [%d] socket to epoll context: %s\n",
+                               ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno));
+                       goto error;
+               }
+       }
+
+       ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread started\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       /* Notify world we're running */
+       priv->running = 1;
+       pthread_cond_signal(&priv->ctrl_cond);
+
+       while (priv->running > 0) {
+               struct epoll_event ev[10];
+               int timeout_ms = MISDN_EPOLL_WAIT_MAX_MSEC;
+
+               ret = epoll_wait(epfd, ev, ftdm_array_len(ev), timeout_ms);
+               if (ret < 0) {
+                       switch (errno) {
+                       case EAGAIN:
+                       case EINTR:
+                               continue;
+                       default:
+                               ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] epoll_wait() failed: %s\n",
+                                       ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+                               goto error;
+                       }
+               }
+               /* Check events */
+               for (i = 0; i < ret; i++) {
+                       /* */
+                       if (ev[i].data.fd == priv->event_pipe_out) {
+                               struct misdn_command cmd;
+                               /* event pipe */
+                               ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] event pipe notification\n",
+                                       ftdm_span_get_id(span), ftdm_span_get_name(span));
+                               ret = read(priv->event_pipe_out, &cmd, sizeof(cmd));
+                               if (ret < sizeof(cmd)) {
+                                       ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to read span thread command\n",
+                                               ftdm_span_get_id(span), ftdm_span_get_name(span));
+                                       continue;
+                               }
+
+                               switch (cmd.type) {
+                               case MISDN_CMD_STOP:
+                                       ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got STOP command\n",
+                                               ftdm_span_get_id(span), ftdm_span_get_name(span));
+                                       priv->running = -1;
+                                       break;
+                               default:
+                                       ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got unknown command: %d\n",
+                                               ftdm_span_get_id(span), ftdm_span_get_name(span), cmd.type);
+                               }
+
+                       } else {
+                               ftdm_channel_t *chan = ev[i].data.ptr;
+                               handle_b_channel_event(chan);
+                       }
+               }
+       }
+error:
+       ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread stopped\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       /* Remove epoll event sources */
+       for (i = 1; i <= span->chan_count; i++) {
+               ftdm_channel_t *chan = span->channels[i];
+               ftdm_assert(chan, "channel == NULL");
+
+               if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B)
+                       continue;
+
+               ret = epoll_ctl(epfd, EPOLL_CTL_DEL, chan->sockfd, NULL);
+               if (ret < 0) {
+                       ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to remove b-channel [%d] socket from epoll context: %s\n",
+                               ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno));
+               }
+       }
+
+       /* Close epoll context */
+       if (epfd >= 0) close(epfd);
+
+       /* Notify world we stopped running */
+       priv->running = 0;
+       pthread_cond_signal(&priv->ctrl_cond);
+       return NULL;
+}
+
+/**
+ * Timeout (miliseconds) for span start/stop completion
+ */
+#define SPAN_DEFAULT_TIMEOUT_MSEC      10000
+
+static FIO_SPAN_START_FUNCTION(misdn_span_start)
+{
+       struct misdn_span_private *span_priv = ftdm_span_io_private(span);
+       struct timespec timeout;
+       int retval;
+
+       ftdm_log(FTDM_LOG_NOTICE, "mISDN starting span %d (%s)\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       span_priv->running = 0;
+
+       if (ftdm_thread_create_detached(misdn_span_run, span) != FTDM_SUCCESS) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s)\n",
+                       ftdm_span_get_id(span), ftdm_span_get_name(span));
+               return FTDM_FAIL;
+       }
+
+       /*
+        * Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to start up
+        */
+       clock_gettime(CLOCK_REALTIME, &timeout);
+       ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC);
+
+       pthread_mutex_lock(&span_priv->ctrl_cond_mutex);
+
+       retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout);
+       if (retval == ETIMEDOUT) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s) in 10 seconds\n",
+                       ftdm_span_get_id(span), ftdm_span_get_name(span));
+               return FTDM_FAIL;
+       } else if (retval) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s): %s\n",
+                       ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+               return FTDM_FAIL;
+       }
+
+       pthread_mutex_unlock(&span_priv->ctrl_cond_mutex);
+       return FTDM_SUCCESS;
+}
+
+static FIO_SPAN_STOP_FUNCTION(misdn_span_stop)
+{
+       struct misdn_span_private *span_priv = ftdm_span_io_private(span);
+       struct timespec timeout;
+       struct misdn_command cmd;
+       int retval;
+
+       ftdm_log(FTDM_LOG_NOTICE, "mISDN stopping span %d (%s)\n",
+               ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+       span_priv->running = -1;
+
+       /* Wake up thread */
+       cmd.type = MISDN_CMD_STOP;
+       retval = write(span_priv->event_pipe_in, &cmd, sizeof(cmd));
+       if (retval < sizeof(cmd)) {
+               ftdm_log(FTDM_LOG_WARNING, "mISDN failed to send STOP command to span thread\n");
+       }
+
+       /*
+        * Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to shut down
+        */
+       clock_gettime(CLOCK_REALTIME, &timeout);
+       ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC);
+
+       pthread_mutex_lock(&span_priv->ctrl_cond_mutex);
+
+       retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout);
+       if (retval == ETIMEDOUT) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread in 10 seconds\n");
+               return FTDM_FAIL;
+       } else if (retval) {
+               ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread: %s\n",
+                       strerror(errno));
+               return FTDM_FAIL;
+       }
+
+       pthread_mutex_unlock(&span_priv->ctrl_cond_mutex);
+       return FTDM_SUCCESS;
+}
+
+
 /**
  * \brief      ftmod_misdn interface
  */
-//static const ftdm_io_interface_t misdn_interface = {
 static const ftdm_io_interface_t misdn_interface = {
        .name       = "misdn",
 
@@ -2080,8 +2643,12 @@ static const ftdm_io_interface_t misdn_interface = {
        .get_alarms      = misdn_get_alarms,
        .configure       = misdn_configure,             /* configure global parameters */
        .configure_span  = misdn_configure_span,        /* assign channels to span */
+       .channel_next_event = misdn_channel_next_event,
        .channel_destroy = misdn_channel_destroy,       /* clean up channel */
        .span_destroy    = misdn_span_destroy,          /* clean up span */
+
+       .span_start      = misdn_span_start,
+       .span_stop       = misdn_span_stop,
 };