struct ast_rtp_instance *instance;
};
+/*! \brief Packet statistics (used for transport-cc) */
+struct rtp_transport_wide_cc_packet_statistics {
+ /*! The transport specific sequence number */
+ unsigned int seqno;
+ /*! The time at which the packet was received */
+ struct timeval received;
+ /*! The delta between this packet and the previous */
+ int delta;
+};
+
+/*! \brief Statistics information (used for transport-cc) */
+struct rtp_transport_wide_cc_statistics {
+ /*! A vector of packet statistics */
+ AST_VECTOR(, struct rtp_transport_wide_cc_packet_statistics) packet_statistics; /*!< Packet statistics, used for transport-cc */
+ /*! The last sequence number received */
+ unsigned int last_seqno;
+ /*! The last extended sequence number */
+ unsigned int last_extended_seqno;
+ /*! How many feedback packets have gone out */
+ unsigned int feedback_count;
+ /*! How many cycles have occurred for the sequence numbers */
+ unsigned int cycles;
+ /*! Scheduler id for periodic feedback transmission */
+ int schedid;
+};
+
/*! \brief RTP session description */
struct ast_rtp {
int s;
struct ast_data_buffer *send_buffer; /*!< Buffer for storing sent packets for retransmission */
struct ast_data_buffer *recv_buffer; /*!< Buffer for storing received packets for retransmission */
+ struct rtp_transport_wide_cc_statistics transport_wide_cc; /*!< Transport-cc statistics information */
+
#ifdef HAVE_PJPROJECT
ast_cond_t cond; /*!< ICE/TURN condition for signaling */
return -1;
}
+ if (AST_VECTOR_INIT(&rtp->transport_wide_cc.packet_statistics, 0)) {
+ return -1;
+ }
+ rtp->transport_wide_cc.schedid = -1;
+
rtp->f.subclass.format = ao2_bump(ast_format_none);
rtp->lastrxformat = ao2_bump(ast_format_none);
rtp->lasttxformat = ao2_bump(ast_format_none);
ast_data_buffer_free(rtp->recv_buffer);
}
+ AST_VECTOR_FREE(&rtp->transport_wide_cc.packet_statistics);
+
ao2_cleanup(rtp->lasttxformat);
ao2_cleanup(rtp->lastrxformat);
ao2_cleanup(rtp->f.subclass.format);
}
}
+static int rtp_transport_wide_cc_packet_statistics_cmp(struct rtp_transport_wide_cc_packet_statistics a,
+ struct rtp_transport_wide_cc_packet_statistics b)
+{
+ return a.seqno - b.seqno;
+}
+
+static void rtp_transport_wide_cc_feedback_status_vector_append(unsigned char *rtcpheader, int *packet_len, int *status_vector_chunk_bits,
+ uint16_t *status_vector_chunk, int status)
+{
+ /* Appending this status will use up 2 bits */
+ *status_vector_chunk_bits -= 2;
+
+ /* We calculate which bits we want to update the status of. Since a status vector
+ * is 16 bits we take away 2 (for the header), and then we take away any that have
+ * already been used.
+ */
+ *status_vector_chunk |= (status << (16 - 2 - (14 - *status_vector_chunk_bits)));
+
+ /* If there are still bits available we can return early */
+ if (*status_vector_chunk_bits) {
+ return;
+ }
+
+ /* Otherwise we have to place this chunk into the packet */
+ put_unaligned_uint16(rtcpheader + *packet_len, htons(*status_vector_chunk));
+ *status_vector_chunk_bits = 14;
+
+ /* The first bit being 1 indicates that this is a status vector chunk and the second
+ * bit being 1 indicates that we are using 2 bits to represent each status for a
+ * packet.
+ */
+ *status_vector_chunk = (1 << 15) | (1 << 14);
+ *packet_len += 2;
+}
+
+static void rtp_transport_wide_cc_feedback_status_append(unsigned char *rtcpheader, int *packet_len, int *status_vector_chunk_bits,
+ uint16_t *status_vector_chunk, int *run_length_chunk_count, int *run_length_chunk_status, int status)
+{
+ if (*run_length_chunk_status != status) {
+ while (*run_length_chunk_count > 0 && *run_length_chunk_count < 8) {
+ /* Realistically it only makes sense to use a run length chunk if there were 8 or more
+ * consecutive packets of the same type, otherwise we could end up making the packet larger
+ * if we have lots of small blocks of the same type. To help with this we backfill the status
+ * vector (since it always represents 7 packets). Best case we end up with only that single
+ * status vector and the rest are run length chunks.
+ */
+ rtp_transport_wide_cc_feedback_status_vector_append(rtcpheader, packet_len, status_vector_chunk_bits,
+ status_vector_chunk, *run_length_chunk_status);
+ *run_length_chunk_count -= 1;
+ }
+
+ if (*run_length_chunk_count) {
+ /* There is a run length chunk which needs to be written out */
+ put_unaligned_uint16(rtcpheader + *packet_len, htons((0 << 15) | (*run_length_chunk_status << 13) | *run_length_chunk_count));
+ *packet_len += 2;
+ }
+
+ /* In all cases the run length chunk has to be reset */
+ *run_length_chunk_count = 0;
+ *run_length_chunk_status = -1;
+
+ if (*status_vector_chunk_bits == 14) {
+ /* We aren't in the middle of a status vector so we can try for a run length chunk */
+ *run_length_chunk_status = status;
+ *run_length_chunk_count = 1;
+ } else {
+ /* We're doing a status vector so populate it accordingly */
+ rtp_transport_wide_cc_feedback_status_vector_append(rtcpheader, packet_len, status_vector_chunk_bits,
+ status_vector_chunk, status);
+ }
+ } else {
+ /* This is easy, the run length chunk count can just get bumped up */
+ *run_length_chunk_count += 1;
+ }
+}
+
+static int rtp_transport_wide_cc_feedback_produce(const void *data)
+{
+ struct ast_rtp_instance *instance = (struct ast_rtp_instance *) data;
+ struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ unsigned char *rtcpheader;
+ char bdata[1024];
+ struct rtp_transport_wide_cc_packet_statistics *first_packet;
+ struct rtp_transport_wide_cc_packet_statistics *previous_packet;
+ int i;
+ int status_vector_chunk_bits = 14;
+ uint16_t status_vector_chunk = (1 << 15) | (1 << 14);
+ int run_length_chunk_count = 0;
+ int run_length_chunk_status = -1;
+ int packet_len = 20;
+ int delta_len = 0;
+ int packet_count = 0;
+ unsigned int received_msw;
+ unsigned int received_lsw;
+ struct ast_sockaddr remote_address = { { 0, } };
+ int res;
+ int ice;
+ unsigned int large_delta_count = 0;
+ unsigned int small_delta_count = 0;
+ unsigned int lost_count = 0;
+
+ if (!rtp || !rtp->rtcp || rtp->transport_wide_cc.schedid == -1) {
+ ao2_ref(instance, -1);
+ return 0;
+ }
+
+ ao2_lock(instance);
+
+ rtcpheader = (unsigned char *)bdata;
+
+ /* The first packet in the vector acts as our base sequence number and reference time */
+ first_packet = AST_VECTOR_GET_ADDR(&rtp->transport_wide_cc.packet_statistics, 0);
+ previous_packet = first_packet;
+
+ /* We go through each packet that we have statistics for, adding it either to a status
+ * vector chunk or a run length chunk. The code tries to be as efficient as possible to
+ * reduce packet size and will favor run length chunks when it makes sense.
+ */
+ for (i = 0; i < AST_VECTOR_SIZE(&rtp->transport_wide_cc.packet_statistics); ++i) {
+ struct rtp_transport_wide_cc_packet_statistics *statistics;
+ int lost = 0;
+ int res = 0;
+
+ statistics = AST_VECTOR_GET_ADDR(&rtp->transport_wide_cc.packet_statistics, i);
+
+ packet_count++;
+
+ if (first_packet != statistics) {
+ /* The vector stores statistics in a sorted fashion based on the sequence
+ * number. This ensures we can detect any packets that have been lost/not
+ * received by comparing the sequence numbers.
+ */
+ lost = statistics->seqno - (previous_packet->seqno + 1);
+ lost_count += lost;
+ }
+
+ while (lost) {
+ /* We append a not received status until all the lost packets have been accounted for */
+ rtp_transport_wide_cc_feedback_status_append(rtcpheader, &packet_len, &status_vector_chunk_bits,
+ &status_vector_chunk, &run_length_chunk_count, &run_length_chunk_status, 0);
+ packet_count++;
+
+ /* If there is no more room left for storing packets stop now, we leave 20
+ * extra bits at the end just in case.
+ */
+ if ((sizeof(bdata) - (packet_len + delta_len + 20)) < 0) {
+ res = -1;
+ break;
+ }
+
+ lost--;
+ }
+
+ /* If the lost packet appending bailed out because we have no more space, then exit here too */
+ if (res) {
+ break;
+ }
+
+ /* Per the spec the delta is in increments of 250 */
+ statistics->delta = ast_tvdiff_us(statistics->received, previous_packet->received) / 250;
+
+ /* Based on the delta determine the status of this packet */
+ if (statistics->delta < 0 || statistics->delta > 127) {
+ /* Large or negative delta */
+ rtp_transport_wide_cc_feedback_status_append(rtcpheader, &packet_len, &status_vector_chunk_bits,
+ &status_vector_chunk, &run_length_chunk_count, &run_length_chunk_status, 2);
+ delta_len += 2;
+ large_delta_count++;
+ } else {
+ /* Small delta */
+ rtp_transport_wide_cc_feedback_status_append(rtcpheader, &packet_len, &status_vector_chunk_bits,
+ &status_vector_chunk, &run_length_chunk_count, &run_length_chunk_status, 1);
+ delta_len += 1;
+ small_delta_count++;
+ }
+
+ previous_packet = statistics;
+
+ /* If there is no more room left in the packet stop handling of any subsequent packets */
+ if ((sizeof(bdata) - (packet_len + delta_len + 20)) < 0) {
+ break;
+ }
+ }
+
+ if (status_vector_chunk_bits != 14) {
+ /* If the status vector chunk has packets in it then place it in the RTCP packet */
+ put_unaligned_uint16(rtcpheader + packet_len, htons(status_vector_chunk));
+ packet_len += 2;
+ } else if (run_length_chunk_count) {
+ /* If there is a run length chunk in progress then place it in the RTCP packet */
+ put_unaligned_uint16(rtcpheader + packet_len, htons((0 << 15) | (run_length_chunk_status << 13) | run_length_chunk_count));
+ packet_len += 2;
+ }
+
+ /* We iterate again to build delta chunks */
+ for (i = 0; i < AST_VECTOR_SIZE(&rtp->transport_wide_cc.packet_statistics); ++i) {
+ struct rtp_transport_wide_cc_packet_statistics *statistics;
+
+ statistics = AST_VECTOR_GET_ADDR(&rtp->transport_wide_cc.packet_statistics, i);
+
+ if (statistics->delta < 0 || statistics->delta > 127) {
+ /* We need 2 bytes to store this delta */
+ put_unaligned_uint16(rtcpheader + packet_len, htons(statistics->delta));
+ packet_len += 2;
+ } else {
+ /* We can store this delta in 1 byte */
+ rtcpheader[packet_len] = statistics->delta;
+ packet_len += 1;
+ }
+
+ /* If this is the last packet handled by the run length chunk or status vector chunk code
+ * then we can go no further.
+ */
+ if (statistics == previous_packet) {
+ break;
+ }
+ }
+
+ /* Zero pad the end of the packet */
+ while (packet_len % 4) {
+ rtcpheader[packet_len++] = 0;
+ }
+
+ /* Add the general RTCP header information */
+ put_unaligned_uint32(rtcpheader, htonl((2 << 30) | (AST_RTP_RTCP_FMT_TRANSPORT_WIDE_CC << 24)
+ | (AST_RTP_RTCP_RTPFB << 16) | ((packet_len / 4) - 1)));
+ put_unaligned_uint32(rtcpheader + 4, htonl(rtp->ssrc));
+ put_unaligned_uint32(rtcpheader + 8, htonl(rtp->themssrc));
+
+ /* Add the transport-cc specific header information */
+ put_unaligned_uint32(rtcpheader + 12, htonl((first_packet->seqno << 16) | packet_count));
+
+ timeval2ntp(first_packet->received, &received_msw, &received_lsw);
+ put_unaligned_time24(rtcpheader + 16, received_msw, received_lsw);
+ rtcpheader[19] = rtp->transport_wide_cc.feedback_count;
+
+ /* The packet is now fully constructed so send it out */
+ ast_sockaddr_copy(&remote_address, &rtp->rtcp->them);
+
+ ast_debug(2, "Sending transport-cc feedback packet of size '%d' on '%s' with packet count of %d (small = %d, large = %d, lost = %d)\n",
+ packet_len, ast_rtp_instance_get_channel_id(instance), packet_count, small_delta_count, large_delta_count, lost_count);
+
+ res = rtcp_sendto(instance, (unsigned int *)rtcpheader, packet_len, 0, &remote_address, &ice);
+ if (res < 0) {
+ ast_log(LOG_ERROR, "RTCP transport-cc feedback error to %s due to %s\n",
+ ast_sockaddr_stringify(&remote_address), strerror(errno));
+ }
+
+ AST_VECTOR_RESET(&rtp->transport_wide_cc.packet_statistics, AST_VECTOR_ELEM_CLEANUP_NOOP);
+
+ rtp->transport_wide_cc.feedback_count++;
+
+ ao2_unlock(instance);
+
+ return 1000;
+}
+
+static void rtp_instance_parse_transport_wide_cc(struct ast_rtp_instance *instance, struct ast_rtp *rtp,
+ unsigned char *data, int len)
+{
+ uint16_t *seqno = (uint16_t *)data;
+ struct rtp_transport_wide_cc_packet_statistics statistics;
+ struct ast_rtp_instance *transport = rtp->bundled ? rtp->bundled : instance;
+ struct ast_rtp *transport_rtp = ast_rtp_instance_get_data(transport);
+
+ /* If the sequence number has cycled over then record it as such */
+ if (((int)transport_rtp->transport_wide_cc.last_seqno - (int)ntohs(*seqno)) > 100) {
+ transport_rtp->transport_wide_cc.cycles += RTP_SEQ_MOD;
+ }
+
+ /* Populate the statistics information for this packet */
+ statistics.seqno = transport_rtp->transport_wide_cc.cycles + ntohs(*seqno);
+ statistics.received = ast_tvnow();
+
+ /* We allow at a maximum 1000 packet statistics in play at a time, if we hit the
+ * limit we give up and start fresh.
+ */
+ if (AST_VECTOR_SIZE(&transport_rtp->transport_wide_cc.packet_statistics) > 1000) {
+ AST_VECTOR_RESET(&rtp->transport_wide_cc.packet_statistics, AST_VECTOR_ELEM_CLEANUP_NOOP);
+ }
+
+ if (!AST_VECTOR_SIZE(&transport_rtp->transport_wide_cc.packet_statistics) ||
+ statistics.seqno > transport_rtp->transport_wide_cc.last_extended_seqno) {
+ /* This is the expected path */
+ if (AST_VECTOR_APPEND(&transport_rtp->transport_wide_cc.packet_statistics, statistics)) {
+ return;
+ }
+
+ transport_rtp->transport_wide_cc.last_extended_seqno = statistics.seqno;
+ transport_rtp->transport_wide_cc.last_seqno = ntohs(*seqno);
+ } else {
+ /* This packet was out of order, so reorder it within the vector accordingly */
+ if (AST_VECTOR_ADD_SORTED(&transport_rtp->transport_wide_cc.packet_statistics, statistics,
+ rtp_transport_wide_cc_packet_statistics_cmp)) {
+ return;
+ }
+ }
+
+ /* If we have not yet scheduled the periodic sending of feedback for this transport then do so */
+ if (transport_rtp->transport_wide_cc.schedid < 0 && transport_rtp->rtcp) {
+ ast_debug(1, "Starting RTCP transport-cc feedback transmission on RTP instance '%p'\n", transport);
+ ao2_ref(transport, +1);
+ ast_log(LOG_NOTICE, "Starting feedback\n");
+ transport_rtp->transport_wide_cc.schedid = ast_sched_add(rtp->sched, 1000,
+ rtp_transport_wide_cc_feedback_produce, transport);
+ if (transport_rtp->transport_wide_cc.schedid < 0) {
+ ao2_ref(transport, -1);
+ ast_log(LOG_WARNING, "Scheduling RTCP transport-cc feedback transmission failed on RTP instance '%p'\n",
+ transport);
+ }
+ }
+}
+
+static void rtp_instance_parse_extmap_extensions(struct ast_rtp_instance *instance, struct ast_rtp *rtp,
+ unsigned char *extension, int len)
+{
+ int transport_wide_cc_id = ast_rtp_instance_extmap_get_id(instance, AST_RTP_EXTENSION_TRANSPORT_WIDE_CC);
+ int pos = 0;
+
+ /* We currently only care about the transport-cc extension, so if that's not negotiated then do nothing */
+ if (transport_wide_cc_id == -1) {
+ return;
+ }
+
+ /* Only while we do not exceed available extension data do we continue */
+ while (pos < len) {
+ int id = extension[pos] >> 4;
+ int extension_len = (extension[pos] & 0xF) + 1;
+
+ /* We've handled the first byte as it contains the extension id and length, so always
+ * skip ahead now
+ */
+ pos += 1;
+
+ if (id == 0) {
+ /* From the RFC:
+ * In both forms, padding bytes have the value of 0 (zero). They may be
+ * placed between extension elements, if desired for alignment, or after
+ * the last extension element, if needed for padding. A padding byte
+ * does not supply the ID of an element, nor the length field. When a
+ * padding byte is found, it is ignored and the parser moves on to
+ * interpreting the next byte.
+ */
+ continue;
+ } else if (id == 15) {
+ /* From the RFC:
+ * The local identifier value 15 is reserved for future extension and
+ * MUST NOT be used as an identifier. If the ID value 15 is
+ * encountered, its length field should be ignored, processing of the
+ * entire extension should terminate at that point, and only the
+ * extension elements present prior to the element with ID 15
+ * considered.
+ */
+ break;
+ } else if ((pos + extension_len) > len) {
+ /* The extension is corrupted and is stating that it contains more data than is
+ * available in the extensions data.
+ */
+ break;
+ }
+
+ /* If this is transport-cc then we need to parse it further */
+ if (id == transport_wide_cc_id) {
+ rtp_instance_parse_transport_wide_cc(instance, rtp, extension + pos, extension_len);
+ }
+
+ /* Skip ahead to the next extension */
+ pos += extension_len;
+ }
+}
+
static struct ast_frame *ast_rtp_interpret(struct ast_rtp_instance *instance, struct ast_srtp *srtp,
const struct ast_sockaddr *remote_address, unsigned char *read_area, int length, int prev_seqno)
{
/* Look for any RTP extensions, currently we do not support any */
if (ext) {
- hdrlen += (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2;
- hdrlen += 4;
- if (DEBUG_ATLEAST(1)) {
- unsigned int profile;
- profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16;
+ int extensions_size = (ntohl(rtpheader[hdrlen/4]) & 0xffff) << 2;
+ unsigned int profile;
+ profile = (ntohl(rtpheader[3]) & 0xffff0000) >> 16;
+
+ if (profile == 0xbede) {
+ /* We skip over the first 4 bytes as they are just for the one byte extension header */
+ rtp_instance_parse_extmap_extensions(instance, rtp, read_area + hdrlen + 4, extensions_size);
+ } else if (DEBUG_ATLEAST(1)) {
if (profile == 0x505a) {
ast_log(LOG_DEBUG, "Found Zfone extension in RTP stream - zrtp - not supported.\n");
- } else if (profile != 0xbede) {
+ } else {
/* SDP negotiated RTP extensions can not currently be output in logging */
ast_log(LOG_DEBUG, "Found unknown RTP Extensions %x\n", profile);
}
}
+
+ hdrlen += extensions_size;
+ hdrlen += 4;
}
/* Make sure after we potentially mucked with the header length that it is once again valid */
ao2_lock(instance);
rtp->rtcp->schedid = -1;
}
+ if (rtp->transport_wide_cc.schedid > -1) {
+ ao2_unlock(instance);
+ if (!ast_sched_del(rtp->sched, rtp->transport_wide_cc.schedid)) {
+ ao2_ref(instance, -1);
+ } else {
+ ast_debug(1, "Failed to tear down RTCP transport-cc feedback on RTP instance '%p'\n", instance);
+ ao2_lock(instance);
+ return;
+ }
+ ao2_lock(instance);
+ rtp->transport_wide_cc.schedid = -1;
+ }
if (rtp->rtcp->s > -1 && rtp->rtcp->s != rtp->s) {
close(rtp->rtcp->s);
}
rtp->rtcp->schedid = -1;
}
+ if (rtp->transport_wide_cc.schedid > -1) {
+ ao2_unlock(instance);
+ if (!ast_sched_del(rtp->sched, rtp->transport_wide_cc.schedid)) {
+ ao2_ref(instance, -1);
+ }
+ ao2_lock(instance);
+ rtp->transport_wide_cc.schedid = -1;
+ }
+
if (rtp->red) {
ao2_unlock(instance);
AST_SCHED_DEL(rtp->sched, rtp->red->schedid);
{
switch (extension) {
case AST_RTP_EXTENSION_ABS_SEND_TIME:
+ case AST_RTP_EXTENSION_TRANSPORT_WIDE_CC:
return 1;
default:
return 0;