#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
+#include <sys/epoll.h>
/* this is how it should have been...
#ifdef HAVE_FREETDM_FREETDM_H
//#define MISDN_DEBUG_EVENTS
//#define MISDN_DEBUG_IO
+#define ACCESS_ONCE(x) (*(volatile typeof(x) *)&(x))
typedef enum {
MISDN_CAPS_NONE = 0,
#undef MISDN_CONTROL_TYPE
};
-#if 0 /* unused for now */
+#ifdef MISDN_DEBUG_EVENTS
static const char *misdn_control2str(const int ctrl)
{
int x;
* mISDN <-> FreeTDM data structures
***********************************************************************************/
+typedef enum {
+ MISDN_CMD_NONE = 0,
+ MISDN_CMD_STOP, /*!< Stop the I/O thread */
+} misdn_cmd_t;
+
+struct misdn_command {
+ misdn_cmd_t type;
+/* union { } cmd; */ /*!< Command-specific parameters */
+};
+
enum {
MISDN_SPAN_NONE = 0,
MISDN_SPAN_RUNNING = (1 << 0),
struct misdn_span_private {
int flags;
+ int running;
+
+ int event_pipe_in;
+ int event_pipe_out;
/* event conditional */
pthread_mutex_t event_cond_mutex;
pthread_cond_t event_cond;
+
+ /* start / stop feedback */
+ pthread_mutex_t ctrl_cond_mutex;
+ pthread_cond_t ctrl_cond;
};
struct misdn_event_queue;
/* hw addr of channel */
struct sockaddr_mISDN addr;
- /* audio tx pipe */
- int audio_pipe_in;
- int audio_pipe_out;
+ /* audio tx pipe (= socketpair ends) */
+ int tx_audio_pipe_in;
+ int tx_audio_pipe_out;
+ int rx_audio_pipe_in;
+ int rx_audio_pipe_out;
/* counters */
unsigned long tx_cnt;
unsigned long slip_rx_cnt;
unsigned long slip_tx_cnt;
+ unsigned long tx_pipe_wr_bytes; /*!< Number of bytes written into tx audio pipe */
+ unsigned long tx_pipe_rd_bytes; /*!< Number of bytes read from tx audio pipe */
+ unsigned long tx_miss_bytes; /*!< Number of bytes missing in short reads from tx audio pipe */
+ unsigned long tx_lost_bytes; /*!< Number of bytes lost in short writes to the mISDN B-Channel */
+ unsigned long tx_sent_bytes; /*!< Number of bytes successfully sent to the mISDN B-Channel */
+ unsigned long tx_pipe_under_cnt; /*!< Number of tx audio pipe underflows */
+ unsigned long tx_pipe_over_cnt; /*!< Number of tx audio pipe overflows */
+
struct misdn_event_queue *events;
};
(activate) ? "activation" : "deactivation", strerror(errno));
return FTDM_FAIL;
}
-//#ifdef MISDN_DEBUG_EVENTS
+#ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s (%#x)', id %#x, while waiting for %s confirmation on %c-channel\n",
misdn_event2str(hh->prim), hh->prim, hh->id, (activate) ? "activation" : "deactivation",
ftdm_channel_get_type(chan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
-//#endif
+#endif
switch (hh->prim) {
case PH_ACTIVATE_IND:
case PH_ACTIVATE_CNF:
misdn_event2str(req), strerror(errno));
return FTDM_FAIL;
}
-//#ifdef MISDN_DEBUG_EVENTS
+#ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN got event '%s' while waiting for %s answer\n",
misdn_event2str(hh->prim), misdn_event2str(req));
-//#endif
+#endif
switch (hh->prim) {
case MPH_INFORMATION_IND: /* success */
if (retval < MISDN_HEADER_LEN + sizeof(*info)) {
* mISDN <-> FreeTDM interface functions
***********************************************************************************/
-struct misdn_globals {
+static struct misdn_globals {
int sockfd;
} globals;
ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
}
+ ftdm_log_chan(ftdmchan, FTDM_LOG_NOTICE, "mISDN tx stats: wr: %lu, rd: %lu, tx: %lu, tx-lost: %lu, tx-miss: %lu, tx-under#: %lu, tx-over#: %lu\n",
+ chan_priv->tx_pipe_wr_bytes, chan_priv->tx_pipe_rd_bytes,
+ chan_priv->tx_sent_bytes, chan_priv->tx_lost_bytes, chan_priv->tx_miss_bytes,
+ chan_priv->tx_pipe_over_cnt, chan_priv->tx_pipe_under_cnt);
+
chan_priv->active = 0;
}
switch (ftdm_channel_get_type(ftdmchan)) {
case FTDM_CHAN_TYPE_B:
if (*flags & FTDM_WRITE) {
- pfds[nr_fds].fd = chan_priv->audio_pipe_in;
+ pfds[nr_fds].fd = chan_priv->tx_audio_pipe_in;
pfds[nr_fds].events = POLLOUT;
nr_fds++;
}
- if (*flags & (FTDM_READ | FTDM_EVENTS)) {
+ if (*flags & FTDM_READ) {
+ pfds[nr_fds].fd = chan_priv->rx_audio_pipe_out;
+ pfds[nr_fds].events = POLLIN;
+ nr_fds++;
+ }
+/* if (*flags & (FTDM_READ | FTDM_EVENTS)) {
pfds[nr_fds].fd = ftdmchan->sockfd;
pfds[nr_fds].events |= (*flags & FTDM_READ) ? POLLIN : 0;
pfds[nr_fds].events |= (*flags & FTDM_EVENTS) ? POLLPRI : 0;
nr_fds++;
}
+*/
break;
default:
if (*flags & FTDM_READ)
switch (ftdm_channel_get_type(ftdmchan)) {
case FTDM_CHAN_TYPE_B:
- if (pfds[0].revents & POLLOUT)
+ if ((pfds[0].revents & POLLOUT) || (pfds[1].revents & POLLOUT))
*flags |= FTDM_WRITE;
if ((pfds[0].revents & POLLIN) || (pfds[1].revents & POLLIN))
*flags |= FTDM_READ;
/**
* Handle incoming mISDN message on d-channel
- * @param[in] ftdmchan
- * @param[in] msg_buf
- * @param[in] msg_len
- * @internal
+ * \param[in] ftdmchan
+ * \param[in] msg_buf
+ * \param[in] msg_len
+ * \internal
*/
static ftdm_status_t misdn_handle_incoming(ftdm_channel_t *ftdmchan, const char *msg_buf, const int msg_len)
{
int retval;
int maxretry = 10;
- if (!priv->active) {
- ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed channel\n");
- /* ignore */
- *datalen = 0;
- return FTDM_SUCCESS;
- }
-
/* nothing read yet */
*datalen = 0;
* we'll get a lot of "mISDN_send: error -12" message in dmesg otherwise
* (= b-channel receive queue overflowing)
*/
- while (maxretry--) {
- struct sockaddr_mISDN addr;
- socklen_t addrlen = sizeof(addr);
-
- if ((retval = recvfrom(ftdmchan->sockfd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&addr, &addrlen)) < 0) {
- if (errno == EWOULDBLOCK || errno == EAGAIN) break;
- ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to receive incoming message: %s\n",
- strerror(errno));
- return FTDM_FAIL;
- }
-
- if (retval < MISDN_HEADER_LEN) {
- ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "mISDN received message too small\n");
- return FTDM_FAIL;
- }
-
- if (hh->prim == PH_DATA_IND) {
- *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes);
-#ifdef MISDN_DEBUG_IO
- ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n",
- misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel);
-
- if (*datalen > 0) {
- char hbuf[MAX_DATA_MEM] = { 0 };
- print_hex_bytes(data, *datalen, hbuf, sizeof(hbuf));
- ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN read data: %s\n", hbuf);
+ switch (ftdm_channel_get_type(ftdmchan)) {
+ case FTDM_CHAN_TYPE_DQ921: {
+ while (maxretry--) {
+ struct sockaddr_mISDN addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if ((retval = recvfrom(ftdmchan->sockfd, rbuf, sizeof(rbuf), 0, (struct sockaddr *)&addr, &addrlen)) < 0) {
+ if (errno == EWOULDBLOCK || errno == EAGAIN) break;
+ ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to receive incoming message: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
}
-#endif
- if (*datalen <= 0)
- continue;
- /*
- * Copy data into ouput buffer (excluding the mISDN message header)
- * NOTE: audio data needs to be converted to a-law / u-law!
- */
- memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen);
+ if (retval < MISDN_HEADER_LEN) {
+ ftdm_log_chan_msg(ftdmchan, FTDM_LOG_ERROR, "mISDN received message too small\n");
+ return FTDM_FAIL;
+ }
- switch (ftdm_channel_get_type(ftdmchan)) {
- case FTDM_CHAN_TYPE_B:
- hh->prim = PH_DATA_REQ;
- hh->id = MISDN_ID_ANY;
- bytes = *datalen;
+ if (hh->prim == PH_DATA_IND) {
+ *datalen = ftdm_clamp(retval - MISDN_HEADER_LEN, 0, bytes);
- /* Convert incoming audio data to *-law */
- misdn_convert_audio_bits(data, *datalen);
+ if (*datalen <= 0)
+ continue;
/*
- * Fetch required amount of audio from tx pipe, using the amount
- * of received bytes as an indicator for how much free space the
- * b-channel tx buffer has available.
- *
- * (see misdn_write() for the part that fills the tx pipe)
- *
- * NOTE: can't use blocking I/O here since both parts are serviced
- * from the same thread
+ * Copy data into ouput buffer (excluding the mISDN message header)
+ * NOTE: audio data needs to be converted to a-law / u-law!
*/
- if ((retval = read(priv->audio_pipe_out, rbuf + MISDN_HEADER_LEN, bytes)) < 0) {
- if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
- ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read %d bytes of audio data: %s\n",
- bytes, strerror(errno));
- break;
- }
- /* Tx pipe is empty, completely fill buffer up to "bytes" with silence value */
- retval = 0;
- }
-
- /*
- * Use a-law / u-law silence to fill missing bytes,
- * in case there was not enough audio data available in the
- * tx pipe to satisfy the request.
- */
- if (retval < bytes) {
- memset(&rbuf[MISDN_HEADER_LEN + retval],
- (ftdm_channel_get_codec(ftdmchan) == FTDM_CODEC_ALAW) ? 0x2a : 0xff,
- bytes - retval);
- }
-
- /* Convert outgoing audio data to wire format */
- misdn_convert_audio_bits(rbuf + MISDN_HEADER_LEN, bytes);
- bytes += MISDN_HEADER_LEN;
-
- /* Send converted audio to b-channel */
- if ((retval = sendto(ftdmchan->sockfd, rbuf, bytes, 0, (struct sockaddr *)&priv->addr, sizeof(priv->addr))) < bytes) {
- ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to send %d bytes of audio data: (%d) %s\n",
- bytes, retval, strerror(errno));
- }
- break;
- default:
- break;
+ memcpy(data, rbuf + MISDN_HEADER_LEN, *datalen);
+ return FTDM_SUCCESS;
+ } else {
+ *datalen = 0;
+ /* event */
+ misdn_handle_incoming(ftdmchan, rbuf, retval);
}
+ }
+ break;
+ }
+ case FTDM_CHAN_TYPE_B: {
+ if (!priv->active) {
+ ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN ignoring read on closed b-channel\n");
return FTDM_SUCCESS;
- } else {
- *datalen = 0;
-#ifdef MISDN_DEBUG_IO
- ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "misdn_read() received '%s', id: %#x, with %d bytes from channel socket %d [dev.ch: %d.%d]\n",
- misdn_event2str(hh->prim), hh->id, retval - MISDN_HEADER_LEN, ftdmchan->sockfd, addr.dev, addr.channel);
-#endif
- /* event */
- misdn_handle_incoming(ftdmchan, rbuf, retval);
}
+
+ if ((retval = read(priv->rx_audio_pipe_out, data, bytes)) < 0) {
+ ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN failed to read audio data from rx pipe: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ }
+ *datalen = retval;
+ break;
+ }
+ default:
+ break;
}
-#ifdef MISDN_DEBUG_IO
- ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "mISDN nothing received on %c-channel\n",
- ftdm_channel_get_type(ftdmchan) == FTDM_CHAN_TYPE_B ? 'B' : 'D');
-#endif
return FTDM_SUCCESS;
}
* NOTE: can't use blocking I/O here since both parts are serviced
* from the same thread
*/
- if ((retval = write(priv->audio_pipe_in, data, size)) < size) {
- ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error: %s\n",
- strerror(errno));
+ if ((retval = write(priv->tx_audio_pipe_in, data, size)) < 0) {
+ ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "mISDN channel audio pipe write error [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu]: %s\n",
+ priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes,
+ priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, strerror(errno));
return FTDM_FAIL;
+ } else if (retval < size) {
+ priv->tx_pipe_over_cnt++;
+ ftdm_log_chan(ftdmchan, FTDM_LOG_WARNING, "mISDN channel audio pipe short write [wr: %lu, rd: %lu: tx: %lu, tx-under#: %lu, tx-over#: %lu], expected: %d, written: %d\n",
+ priv->tx_pipe_wr_bytes, priv->tx_pipe_rd_bytes, priv->tx_sent_bytes,
+ priv->tx_pipe_under_cnt, priv->tx_pipe_over_cnt, size, retval);
}
+ ACCESS_ONCE(priv->tx_pipe_wr_bytes) += retval;
*datalen = retval;
break;
default:
return FTDM_SUCCESS;
}
+/**
+ * Carefully choosen size for socket send/recv buffers
+ * larger values will add more latency, while lower values will cause deadlocks
+ * (see misdn_span_run() comments below for an explanation)
+ */
+#define SOCKETPAIR_BUFFER_SIZE 3072
static ftdm_status_t misdn_open_range(ftdm_span_t *span, ftdm_chan_type_t type, struct mISDN_devinfo *devinfo, int start, int end)
{
ftdm_log(FTDM_LOG_DEBUG, "mISDN opened socket (on chan:dev => %d:%d): %d\n",
addr.dev, addr.channel, sockfd);
- /* set non-blocking */
+ /* Set mISDN channel socket non-blocking */
if (fcntl(sockfd, F_SETFL, O_NONBLOCK) < 0) {
- ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n", strerror(errno));
+ ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket fd to non-blocking: %s\n",
+ strerror(errno));
+ close(sockfd);
return FTDM_FAIL;
}
if (ftdmchan->type == FTDM_CHAN_TYPE_B) {
int pipefd[2] = { -1, -1 };
- ftdmchan->packet_len = 10 /* ms */ * (ftdmchan->rate / 1000);
+ ftdmchan->packet_len = 30 /* ms */ * (ftdmchan->rate / 1000);
ftdmchan->effective_interval = ftdmchan->native_interval = ftdmchan->packet_len / 8;
ftdmchan->native_codec = ftdmchan->effective_codec = FTDM_CODEC_ALAW;
- ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL);
+// ftdm_channel_set_feature(ftdmchan, FTDM_CHANNEL_FEATURE_INTERVAL);
+#ifdef USE_PIPE
/*
* Create audio tx pipe, use non-blocking I/O to avoid deadlock since both ends
* are used from the same thread
*/
- if (pipe2(pipefd, O_NONBLOCK) < 0) {
- ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio write pipe [%d:%d]: %s\n",
+ if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio tx pipe [%d:%d]: %s\n",
+ addr.dev, x, strerror(errno));
+ close(sockfd);
+ return FTDM_FAIL;
+ }
+ priv->tx_audio_pipe_in = pipefd[1];
+ priv->tx_audio_pipe_out = pipefd[0];
+
+#if 1 || defined(HAVE_F_SETPIPE_SZ)
+ if (fcntl(priv->tx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) {
+ ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio tx pipe size [%d:%d]: %s\n",
+ addr.dev, x, strerror(errno));
+ }
+#endif
+ /*
+ * Create audio rx pipe, use non-blocking I/O to avoid deadlock since both ends
+ * are used from the same thread
+ */
+ if (pipe2(pipefd, 0 | O_NONBLOCK) < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio rx pipe [%d:%d]: %s\n",
addr.dev, x, strerror(errno));
close(sockfd);
return FTDM_FAIL;
}
- priv->audio_pipe_in = pipefd[1];
- priv->audio_pipe_out = pipefd[0];
+ priv->rx_audio_pipe_in = pipefd[1];
+ priv->rx_audio_pipe_out = pipefd[0];
+
+#if 1 || defined(HAVE_F_SETPIPE_SZ)
+ if (fcntl(priv->rx_audio_pipe_in, F_SETPIPE_SZ, 4096) < 0) {
+ ftdm_log(FTDM_LOG_WARNING, "Failed to set mISDN audio rx pipe size [%d:%d]: %s\n",
+ addr.dev, x, strerror(errno));
+ }
+#endif
+#else /* !USE_PIPE */
+ /*
+ * Use a socket pair for audio rx/tx, allows for more fine-grained control
+ * of latency (= amounts of data in buffers)
+ */
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, pipefd) < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "Failed to create mISDN audio socket pair [%d:%d]: %s\n",
+ addr.dev, x, strerror(errno));
+ close(sockfd);
+ return FTDM_FAIL;
+ } else {
+ int opt = SOCKETPAIR_BUFFER_SIZE;
+ socklen_t optlen = sizeof(opt);
+
+ if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[0] to non-blocking: %s\n",
+ strerror(errno));
+ close(sockfd);
+ close(pipefd[0]);
+ close(pipefd[1]);
+ return FTDM_FAIL;
+ }
+ if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN Failed to set socket pair fd[1] to non-blocking: %s\n",
+ strerror(errno));
+ close(sockfd);
+ close(pipefd[0]);
+ close(pipefd[1]);
+ return FTDM_FAIL;
+ }
+ /*
+ * Set RX/TX buffer sizes on each end of the socket pair
+ */
+ if (setsockopt(pipefd[0], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) {
+ ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] RCVBUF: %s\n",
+ strerror(errno));
+ }
+ if (setsockopt(pipefd[0], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) {
+ ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[0] SNDBUF: %s\n",
+ strerror(errno));
+ }
+ if (setsockopt(pipefd[1], SOL_SOCKET, SO_RCVBUF, &opt, optlen) < 0) {
+ ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] RCVBUF: %s\n",
+ strerror(errno));
+ }
+ if (setsockopt(pipefd[1], SOL_SOCKET, SO_SNDBUF, &opt, optlen) < 0) {
+ ftdm_log(FTDM_LOG_WARNING, "mISDN Failed to set socket pair fd[1] SNDBUF: %s\n",
+ strerror(errno));
+ }
+
+ priv->rx_audio_pipe_in = pipefd[1];
+ priv->rx_audio_pipe_out = pipefd[0];
+
+ priv->tx_audio_pipe_in = pipefd[0];
+ priv->tx_audio_pipe_out = pipefd[1];
+ }
+#endif
} else {
/* early activate D-Channel */
misdn_activate_channel(ftdmchan);
/* allocate span private */
if (!span_priv) {
+ int pipe[2] = { -1, -1 };
+
/*
* Not perfect, there should be something like span_create too
*/
/* init event condition */
pthread_cond_init(&span_priv->event_cond, NULL);
pthread_mutex_init(&span_priv->event_cond_mutex, NULL);
+
+ /* init control condition */
+ pthread_cond_init(&span_priv->ctrl_cond, NULL);
+ pthread_mutex_init(&span_priv->ctrl_cond_mutex, NULL);
+
+ /* create event pipe */
+ if (pipe2(pipe, O_CLOEXEC) < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN failed to create event pipe: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ }
+ span_priv->event_pipe_in = pipe[0];
+ span_priv->event_pipe_out = pipe[1];
}
/* split channel list by ',' */
add event queues and data fifos, so we can sift all the
messages we get to forward them to the right receiver
*/
- ftdm_span_t *span = ftdm_channel_get_span(ftdmchan);
- struct misdn_span_private *span_priv = ftdm_span_io_private(span);
char buf[MAX_DATA_MEM] = { 0 };
struct sockaddr_mISDN addr;
struct mISDNhead *hh;
int retval = 0, nr_events = 0;
int i;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts_add_msec(&ts, ms);
-
for (i = 1; i <= ftdm_span_get_chan_count(span); i++) {
ftdm_channel_t *chan = ftdm_span_get_channel(span, i);
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
+ /* Skip channels that have event processing pending (Avoids event storms) */
+ if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT))
+ continue;
+
if (misdn_event_queue_has_data(chan_priv->events)) {
#ifdef MISDN_DEBUG_EVENTS
ftdm_log(FTDM_LOG_DEBUG, "mISDN channel %d:%d has event(s)\n",
ftdm_channel_get_span_id(chan), ftdm_channel_get_id(chan));
#endif
- ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT);
+ ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
chan->last_event_time = ftdm_current_time_in_ms();
nr_events++;
}
}
- if (nr_events)
+ if (nr_events) {
+#ifdef MISDN_DEBUG_EVENTS
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has %d new events pending (pre poll)\n",
+ ftdm_span_get_id(span), nr_events);
+#endif
return FTDM_SUCCESS;
+ }
+
+
+#ifdef MISDN_DEBUG_EVENTS
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d has no events pending, polling for new events with %d ms timeout\n",
+ ftdm_span_get_id(span), ms);
+#endif
+ /* Wait at least 1 ms, max 1 s */
+ ms = ftdm_clamp(ms, 1, 1000);
+
+ clock_gettime(CLOCK_REALTIME, &ts);
+ ts_add_msec(&ts, ms);
if ((retval = pthread_cond_timedwait(&span_priv->event_cond, &span_priv->event_cond_mutex, &ts))) {
switch (retval) {
case ETIMEDOUT:
-// ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n",
-// ftdm_span_get_id(span), ms);
+#ifdef MISDN_DEBUG_EVENTS
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d: No events within %d ms\n",
+ ftdm_span_get_id(span), ms);
+#endif
return FTDM_TIMEOUT;
default:
ftdm_log(FTDM_LOG_DEBUG, "mISDN failed to poll for events on span %d: %s\n",
}
}
+#ifdef MISDN_DEBUG_EVENTS
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN span %d received new event notification, checking channel event queues\n",
+ ftdm_span_get_id(span));
+#endif
for (i = 1; i <= ftdm_span_get_chan_count(span); i++) {
ftdm_channel_t *chan = ftdm_span_get_channel(span, i);
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
+ /* Skip channels that have event processing pending (Avoids event storms) */
+ if (ftdm_test_io_flag(chan, FTDM_CHANNEL_IO_EVENT))
+ continue;
+
if (misdn_event_queue_has_data(chan_priv->events)) {
- ftdm_set_flag(chan, FTDM_CHANNEL_IO_EVENT);
+ ftdm_set_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
chan->last_event_time = ftdm_current_time_in_ms();
nr_events++;
}
return (nr_events) ? FTDM_SUCCESS : FTDM_TIMEOUT; /* no events? => timeout */
}
+/**
+ * Retrieve event from channel
+ * \param ftdmchan Channel to retrieve event from
+ * \param event FreeTDM event to return
+ * \return Success or failure
+ */
+static FIO_CHANNEL_NEXT_EVENT_FUNCTION(misdn_channel_next_event)
+{
+ struct misdn_chan_private *chan_priv = ftdm_chan_io_private(ftdmchan);
+ struct misdn_event *evt = NULL;
+ ftdm_span_t *span = ftdm_channel_get_span(ftdmchan);
+ uint32_t event_id = FTDM_OOB_INVALID;
+
+ ftdm_assert(span, "span == NULL");
+
+ ftdm_clear_io_flag(ftdmchan, FTDM_CHANNEL_IO_EVENT);
+
+ if (!(evt = misdn_event_queue_pop(chan_priv->events))) {
+#ifdef MISDN_DEBUG_EVENTS
+ ftdm_log_chan_msg(ftdmchan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n");
+#endif
+ return FTDM_FAIL;
+ }
+
+#ifdef MISDN_DEBUG_EVENTS
+ ftdm_log_chan(ftdmchan, FTDM_LOG_DEBUG, "Got event '%s' from channel event queue\n",
+ misdn_event2str(evt->id));
+#endif
+ /* Convert from misdn event to ftdm */
+ switch (evt->id) {
+ case PH_DEACTIVATE_IND:
+ event_id = FTDM_OOB_ALARM_TRAP;
+ ftdmchan->alarm_flags |= FTDM_ALARM_RED;
+ break;
+ case PH_ACTIVATE_IND:
+ event_id = FTDM_OOB_ALARM_CLEAR;
+ ftdmchan->alarm_flags &= ~FTDM_ALARM_RED;
+ break;
+ default:
+ ftdm_log_chan(ftdmchan, FTDM_LOG_ERROR, "Unhandled event id %d (0x%x) %s\n",
+ evt->id, evt->id, misdn_event2str(evt->id));
+ }
+
+ ftdmchan->last_event_time = 0;
+ span->event_header.e_type = FTDM_EVENT_OOB;
+ span->event_header.enum_id = event_id;
+ span->event_header.channel = ftdmchan;
+ *event = &span->event_header;
+ return FTDM_SUCCESS;
+}
+
/**
* \brief Retrieve event
* \param span FreeTDM span
struct misdn_chan_private *chan_priv = ftdm_chan_io_private(chan);
struct misdn_event *evt = NULL;
+ ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
+
if (!(evt = misdn_event_queue_pop(chan_priv->events))) {
#ifdef MISDN_DEBUG_EVENTS
ftdm_log_chan_msg(chan, FTDM_LOG_DEBUG, "mISDN channel event queue has no events\n");
#endif
- ftdm_clear_io_flag(chan, FTDM_CHANNEL_IO_EVENT);
continue;
}
{
struct misdn_span_private *span_priv = ftdm_span_io_private(span);
+ /* free resources */
ftdm_span_io_private(span) = NULL;
ftdm_safe_free(span_priv);
}
+/**
+ * Called by misdn_span_run() to handle incoming b-channel events
+ * \param[in] chan FreeTDM channel object
+ * \return FTDM_SUCCESS on success, FTDM_* on error
+ */
+static ftdm_status_t handle_b_channel_event(ftdm_channel_t *chan)
+{
+ struct misdn_chan_private *priv = ftdm_chan_io_private(chan);
+ char buf[MAX_DATA_MEM] = { 0 };
+ struct mISDNhead *mh = (void *)buf;
+ int retval;
+
+ if ((retval = recvfrom(chan->sockfd, buf, sizeof(buf), 0, NULL, NULL)) <= 0) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to receive message: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ }
+
+ if (retval < MISDN_HEADER_LEN) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN message too short, min.: %d, read: %d\n",
+ (int)MISDN_HEADER_LEN, retval);
+ return FTDM_FAIL;
+ }
+
+ switch (mh->prim) {
+ case PH_DATA_IND: {
+ int datalen = retval - MISDN_HEADER_LEN;
+ char *data = buf + MISDN_HEADER_LEN;
+
+ /* Convert audio data */
+ misdn_convert_audio_bits(data, datalen);
+
+ /* Write audio into receive pipe */
+ if ((retval = write(priv->rx_audio_pipe_in, data, datalen)) < 0) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into rx pipe: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ } else if (retval < datalen) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN short write into rx pipe, written: %d, expected: %d\n",
+ retval, datalen);
+ return FTDM_FAIL;
+ }
+
+ /* Get receive buffer usage */
+ if (ioctl(priv->tx_audio_pipe_out, FIONREAD, &retval) < 0) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to get tx audio buffer usage: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ } else if (retval < datalen) {
+// ftdm_log_chan(chan, FTDM_LOG_DEBUG, "mISDN has not enough bytes in tx audio pipe, available: %d, requested: %d\n",
+// retval, datalen);
+ priv->tx_pipe_under_cnt++;
+ return FTDM_SUCCESS;
+ }
+
+#ifdef MISDN_DEBUG_IO
+ ftdm_log_chan(chan, FTDM_LOG_INFO, "mISDN tx audio buffer usage: %d\n",
+ retval);
+#endif
+
+ /* Get audio from tx pipe */
+ if ((retval = read(priv->tx_audio_pipe_out, data, datalen)) < 0) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to read audio data from tx pipe: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ } else if (retval == 0) {
+ ftdm_log_chan_msg(chan, FTDM_LOG_NOTICE, "mISDN tx pipe is empty\n");
+ priv->tx_pipe_under_cnt++;
+ return FTDM_SUCCESS;
+ } else if (retval < datalen) {
+ ftdm_log_chan(chan, FTDM_LOG_NOTICE, "mISDN short read from tx pipe, read: %d, expected: %d\n",
+ retval, datalen);
+ priv->tx_pipe_under_cnt++;
+ priv->tx_miss_bytes += ftdm_max(0, datalen - retval);
+ datalen = retval;
+ }
+ priv->tx_pipe_rd_bytes += retval;
+
+ if (!priv->active) {
+ /* discard */
+ return FTDM_SUCCESS;
+ }
+
+ /* Convert audio data */
+ misdn_convert_audio_bits(data, datalen);
+
+ /* Write to channel */
+ mh->prim = PH_DATA_REQ;
+ mh->id = 0;
+ datalen += MISDN_HEADER_LEN;
+
+ if ((retval = write(chan->sockfd, buf, datalen)) < 0) {
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN failed to write audio data into b-channel: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ } else if (retval < datalen) {
+ ftdm_log_chan(chan, FTDM_LOG_WARNING, "mISDN short write into b-channel, written: %d, expected: %d\n",
+ retval, datalen);
+ priv->tx_lost_bytes += ftdm_max(0, datalen - retval - MISDN_HEADER_LEN);
+ }
+ priv->tx_sent_bytes += ftdm_max(0, retval - MISDN_HEADER_LEN);
+ break;
+ }
+ case PH_DATA_CNF:
+ priv->tx_ack_cnt++;
+ break;
+ case PH_DEACTIVATE_IND:
+ priv->active = 0;
+ break;
+ case PH_ACTIVATE_IND:
+ priv->active = 1;
+ break;
+ default:
+ ftdm_log_chan(chan, FTDM_LOG_ERROR, "mISDN received unknown/unhandled event primitive: (%d) %s\n",
+ mh->prim, misdn_event2str(mh->prim));
+ break;
+ }
+ return FTDM_SUCCESS;
+}
+
+
+/**
+ * Timeout (miliseconds) for epoll_wait()
+ */
+#define MISDN_EPOLL_WAIT_MAX_MSEC 1000
+
+/**
+ * mISDN I/O thread
+ * This thread handles all of the B-Channel I/O, this avoids all of the hazzles with
+ * intermixed data + control frames on mISDN sockets and the missing write poll support on B-Channels.
+ *
+ * Each channel uses a unix stream socketpair as a two-way, pipe replacement for incoming and outgoing
+ * data. Socketpairs allow a more fine grained tuning of the buffer sizes (pipe are restricted to multiples of
+ * the native page size (with the smallest possible size (4k) being already 500ms worth of audio).
+ *
+ * The socketpair buffer sizes and the send algorithm have been carefully tuned to:
+ *
+ * - Minimize the risk of sending too much data and making the mISDN drivers unhappy, by
+ * sending PH_DATA_REQ only when there is as much data available as we have received in
+ * the PH_DATA_IND.
+ *
+ * - Avoid deadlocks between ftdm_write() trying to fill an almust full socket buffer and
+ * the I/O thread not having enough data to send a PH_DATA_REQ message.
+ * (The write() call will return EAGAIN since there is not ehough space free to send all audio data.)
+ *
+ * \param thread FreeTDM thread handle
+ * \param data Private data pointer passed to ftdm_thread_create_detached() (the span object)
+ * \return Always returns NULL (unused)
+ *
+ * \note
+ * ftdm_span_start/_stop() locks the span mutex,
+ * use direct access to span members to avoid deadlocking
+ *
+ * \todo
+ * Move D-Channel handling into the I/O thread too.
+ * Use custom ring buffer structures instead of socketpairs
+ * (for even more fine grained size control).
+ */
+static void *misdn_span_run(ftdm_thread_t *thread, void *data)
+{
+ ftdm_span_t *span = data;
+ struct misdn_span_private *priv = ftdm_span_io_private(span);
+ struct epoll_event evh;
+ int epfd = -1;
+ int ret;
+ int i;
+
+ ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread initializing\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ /* Use epoll for event handling */
+ epfd = epoll_create(1);
+ if (epfd < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to create epoll context: %s\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+ goto error;
+ }
+
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding event pipe to epoll context\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ /* Add event pipe */
+ evh.events = EPOLLIN | EPOLLPRI | EPOLLERR;
+ evh.data.fd = priv->event_pipe_out;
+
+ ret = epoll_ctl(epfd, EPOLL_CTL_ADD, priv->event_pipe_out, &evh);
+ if (ret < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add event pipe to epoll context: %s\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+ goto error;
+ }
+
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channels to epoll context\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ /* Add b-channels */
+ for (i = 1; i <= span->chan_count; i++) {
+ ftdm_channel_t *chan = span->channels[i];
+ ftdm_assert(chan, "channel == NULL");
+
+ if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B)
+ continue;
+
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] adding b-channel [%d:%d] to epoll context\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span),
+ ftdm_channel_get_id(chan), ftdm_channel_get_ph_id(chan));
+
+ evh.events = EPOLLIN | EPOLLPRI | EPOLLERR;
+ evh.data.ptr = chan;
+
+ ret = epoll_ctl(epfd, EPOLL_CTL_ADD, chan->sockfd, &evh);
+ if (ret < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to add b-channel [%d] socket to epoll context: %s\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno));
+ goto error;
+ }
+ }
+
+ ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread started\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ /* Notify world we're running */
+ priv->running = 1;
+ pthread_cond_signal(&priv->ctrl_cond);
+
+ while (priv->running > 0) {
+ struct epoll_event ev[10];
+ int timeout_ms = MISDN_EPOLL_WAIT_MAX_MSEC;
+
+ ret = epoll_wait(epfd, ev, ftdm_array_len(ev), timeout_ms);
+ if (ret < 0) {
+ switch (errno) {
+ case EAGAIN:
+ case EINTR:
+ continue;
+ default:
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] epoll_wait() failed: %s\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+ goto error;
+ }
+ }
+ /* Check events */
+ for (i = 0; i < ret; i++) {
+ /* */
+ if (ev[i].data.fd == priv->event_pipe_out) {
+ struct misdn_command cmd;
+ /* event pipe */
+ ftdm_log(FTDM_LOG_DEBUG, "mISDN[%d:%s] event pipe notification\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+ ret = read(priv->event_pipe_out, &cmd, sizeof(cmd));
+ if (ret < sizeof(cmd)) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to read span thread command\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+ continue;
+ }
+
+ switch (cmd.type) {
+ case MISDN_CMD_STOP:
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got STOP command\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+ priv->running = -1;
+ break;
+ default:
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] got unknown command: %d\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), cmd.type);
+ }
+
+ } else {
+ ftdm_channel_t *chan = ev[i].data.ptr;
+ handle_b_channel_event(chan);
+ }
+ }
+ }
+error:
+ ftdm_log(FTDM_LOG_NOTICE, "mISDN[%d:%s] span thread stopped\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ /* Remove epoll event sources */
+ for (i = 1; i <= span->chan_count; i++) {
+ ftdm_channel_t *chan = span->channels[i];
+ ftdm_assert(chan, "channel == NULL");
+
+ if (ftdm_channel_get_type(chan) != FTDM_CHAN_TYPE_B)
+ continue;
+
+ ret = epoll_ctl(epfd, EPOLL_CTL_DEL, chan->sockfd, NULL);
+ if (ret < 0) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN[%d:%s] failed to remove b-channel [%d] socket from epoll context: %s\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), ftdm_channel_get_id(chan), strerror(errno));
+ }
+ }
+
+ /* Close epoll context */
+ if (epfd >= 0) close(epfd);
+
+ /* Notify world we stopped running */
+ priv->running = 0;
+ pthread_cond_signal(&priv->ctrl_cond);
+ return NULL;
+}
+
+/**
+ * Timeout (miliseconds) for span start/stop completion
+ */
+#define SPAN_DEFAULT_TIMEOUT_MSEC 10000
+
+static FIO_SPAN_START_FUNCTION(misdn_span_start)
+{
+ struct misdn_span_private *span_priv = ftdm_span_io_private(span);
+ struct timespec timeout;
+ int retval;
+
+ ftdm_log(FTDM_LOG_NOTICE, "mISDN starting span %d (%s)\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ span_priv->running = 0;
+
+ if (ftdm_thread_create_detached(misdn_span_run, span) != FTDM_SUCCESS) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s)\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+ return FTDM_FAIL;
+ }
+
+ /*
+ * Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to start up
+ */
+ clock_gettime(CLOCK_REALTIME, &timeout);
+ ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC);
+
+ pthread_mutex_lock(&span_priv->ctrl_cond_mutex);
+
+ retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout);
+ if (retval == ETIMEDOUT) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s) in 10 seconds\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+ return FTDM_FAIL;
+ } else if (retval) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN failed to start span %d (%s): %s\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span), strerror(errno));
+ return FTDM_FAIL;
+ }
+
+ pthread_mutex_unlock(&span_priv->ctrl_cond_mutex);
+ return FTDM_SUCCESS;
+}
+
+static FIO_SPAN_STOP_FUNCTION(misdn_span_stop)
+{
+ struct misdn_span_private *span_priv = ftdm_span_io_private(span);
+ struct timespec timeout;
+ struct misdn_command cmd;
+ int retval;
+
+ ftdm_log(FTDM_LOG_NOTICE, "mISDN stopping span %d (%s)\n",
+ ftdm_span_get_id(span), ftdm_span_get_name(span));
+
+ span_priv->running = -1;
+
+ /* Wake up thread */
+ cmd.type = MISDN_CMD_STOP;
+ retval = write(span_priv->event_pipe_in, &cmd, sizeof(cmd));
+ if (retval < sizeof(cmd)) {
+ ftdm_log(FTDM_LOG_WARNING, "mISDN failed to send STOP command to span thread\n");
+ }
+
+ /*
+ * Wait SPAN_DEFAULT_TIMEOUT_MSEC miliseconds for I/O thread to shut down
+ */
+ clock_gettime(CLOCK_REALTIME, &timeout);
+ ts_add_msec(&timeout, SPAN_DEFAULT_TIMEOUT_MSEC);
+
+ pthread_mutex_lock(&span_priv->ctrl_cond_mutex);
+
+ retval = pthread_cond_timedwait(&span_priv->ctrl_cond, &span_priv->ctrl_cond_mutex, &timeout);
+ if (retval == ETIMEDOUT) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread in 10 seconds\n");
+ return FTDM_FAIL;
+ } else if (retval) {
+ ftdm_log(FTDM_LOG_ERROR, "mISDN failed to stop thread: %s\n",
+ strerror(errno));
+ return FTDM_FAIL;
+ }
+
+ pthread_mutex_unlock(&span_priv->ctrl_cond_mutex);
+ return FTDM_SUCCESS;
+}
+
+
/**
* \brief ftmod_misdn interface
*/
-//static const ftdm_io_interface_t misdn_interface = {
static const ftdm_io_interface_t misdn_interface = {
.name = "misdn",
.get_alarms = misdn_get_alarms,
.configure = misdn_configure, /* configure global parameters */
.configure_span = misdn_configure_span, /* assign channels to span */
+ .channel_next_event = misdn_channel_next_event,
.channel_destroy = misdn_channel_destroy, /* clean up channel */
.span_destroy = misdn_span_destroy, /* clean up span */
+
+ .span_start = misdn_span_start,
+ .span_stop = misdn_span_stop,
};