]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
[mod_opusfile] add ogg/opus streams, fix Makefile for encoding.
authorDragos Oancea <dragos@signalwire.com>
Thu, 12 Mar 2020 14:41:31 +0000 (14:41 +0000)
committerAndrey Volk <andywolk@gmail.com>
Fri, 15 Oct 2021 13:52:06 +0000 (16:52 +0300)
[mod_opusfile] add stats, opusctl

[mod_opusfile] add unit-test using teletone

src/mod/formats/mod_opusfile/Makefile.am
src/mod/formats/mod_opusfile/mod_opusfile.c
src/mod/formats/mod_opusfile/test/freeswitch.xml
src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.opus [new file with mode: 0644]
src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.wav [new file with mode: 0644]
src/mod/formats/mod_opusfile/test/sounds/opusfile-test-ogg.bitstream [new file with mode: 0644]
src/mod/formats/mod_opusfile/test/test_opusfile.c

index 7c776f9d5630b6cfc098965863456c8d4af28e75..6f4919c5b207d373f4557a37e9afa7352d8caf0e 100644 (file)
@@ -22,6 +22,12 @@ noinst_PROGRAMS = test/test_opusfile
 test_test_opusfile_SOURCES = test/test_opusfile.c
 test_test_opusfile_CFLAGS = $(AM_CFLAGS) -I./ -I../ -DSWITCH_TEST_BASE_DIR_FOR_CONF=\"${abs_builddir}/test\" -DSWITCH_TEST_BASE_DIR_OVERRIDE=\"${abs_builddir}/test\" $(OPUSFILE_DECODE_CFLAGS)
 test_test_opusfile_LDFLAGS = $(AM_LDFLAGS) -avoid-version -no-undefined $(freeswitch_LDFLAGS) $(switch_builddir)/libfreeswitch.la $(CORE_LIBS) $(APR_LIBS) $(OPUSFILE_DECODE_LIBS)
+
+if HAVE_OPUSFILE_ENCODE
+test_test_opusfile_CFLAGS += $(OPUSFILE_ENCODE_CFLAGS) -DHAVE_OPUSFILE_ENCODE
+test_test_opusfile_LDFLAGS += $(OPUSFILE_ENCODE_LIBS)
+endif 
+
 test_test_opusfile_LDADD = libopusfilemod.la $(switch_builddir)/libfreeswitch.la
 
 TESTS = $(noinst_PROGRAMS)
index 25246a7ced21839cb0c116a3d1018426656763b0..ea932ed0ff13148e39b01bd97955ebe23be6b9e1 100644 (file)
 #define DEFAULT_RATE 48000 /* default fullband */
 #define OPUS_MAX_PCM 5760 /* opus recommended max output buf */
 
+#define OPUSSTREAM_MAX 64*1024 
+#define OGG_MIN_PAGE_SIZE 2400 // this much data buffered before trying to open the incoming stream
+#define OGG_MAX_PAGE_SIZE 65307 // a bit less than 64k, standard ogg
+
+#define PAGES_PER_SEC 4
+
+//#define LIMIT_DROP
+
+#ifdef LIMIT_DROP
+#define MIN_OGG_PAYLOAD 40 // drop incoming frames smaller than this (decoder)
+#endif 
+
 //#undef HAVE_OPUSFILE_ENCODE  /*don't encode anything */
 
 SWITCH_MODULE_LOAD_FUNCTION(mod_opusfile_load);
@@ -87,6 +99,53 @@ struct opus_file_context {
 
 typedef struct opus_file_context opus_file_context;
 
+struct opus_stream_context {
+       switch_file_t *fd;
+       OggOpusFile *of;
+       ogg_int64_t duration;
+       int output_seekable;
+       ogg_int64_t pcm_offset;
+       ogg_int64_t pcm_print_offset;
+       ogg_int64_t next_pcm_offset;
+       opus_int64 raw_offset;
+       ogg_int64_t nsamples;
+       opus_int32  bitrate;
+       int li;
+       int prev_li;
+       switch_mutex_t *audio_mutex;
+       switch_buffer_t *audio_buffer;
+       switch_mutex_t *ogg_mutex;
+       switch_buffer_t *ogg_buffer;
+       unsigned char ogg_data[OGG_MAX_PAGE_SIZE * 2];
+       unsigned int ogg_data_len;
+       switch_bool_t read_stream;
+       switch_bool_t dec_page_ready;
+       opus_int16 decode_buf[OPUS_MAX_PCM];
+       switch_bool_t eof;
+       switch_thread_rwlock_t *rwlock;
+       switch_file_handle_t *handle;
+       size_t samplerate;
+       int frame_size;
+       int dec_channels;
+       size_t err;
+       opus_int16 *opusbuf;
+       switch_size_t opusbuflen;
+#ifdef HAVE_OPUSFILE_ENCODE
+       OggOpusEnc *enc;
+       OggOpusComments *comments;
+       unsigned char encode_buf[OPUSSTREAM_MAX];
+       int encoded_buflen;
+       size_t samples_encode;
+       int enc_channels;
+       unsigned int enc_pagecount;
+#endif
+       unsigned int dec_count;
+       switch_thread_t *read_stream_thread;
+       switch_memory_pool_t *pool;
+};
+
+typedef struct opus_stream_context opus_stream_context_t;
+
 static struct {
        int debug;
 } globals;
@@ -169,7 +228,6 @@ static switch_status_t switch_opusfile_open(switch_file_handle_t *handle, const
 {
        opus_file_context *context;
        char *ext;
-       unsigned int flags = 0;
        int ret;
 
        if ((ext = strrchr(path, '.')) == 0) {
@@ -190,22 +248,11 @@ static switch_status_t switch_opusfile_open(switch_file_handle_t *handle, const
 
        switch_mutex_init(&context->audio_mutex, SWITCH_MUTEX_NESTED, context->pool);
 
-       if (switch_test_flag(handle, SWITCH_FILE_FLAG_WRITE)) {
-               flags |= SWITCH_FOPEN_WRITE | SWITCH_FOPEN_CREATE;
-               if (switch_test_flag(handle, SWITCH_FILE_WRITE_APPEND) || switch_test_flag(handle, SWITCH_FILE_WRITE_OVER)) {
-                       flags |= SWITCH_FOPEN_READ;
-               } else {
-                       flags |= SWITCH_FOPEN_TRUNCATE;
-               }
-       }
-
        if (switch_test_flag(handle, SWITCH_FILE_FLAG_READ)) {
                if (switch_buffer_create_dynamic(&context->audio_buffer, TC_BUFFER_SIZE, TC_BUFFER_SIZE * 2, 0) != SWITCH_STATUS_SUCCESS) {
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
                        goto err;
                }
-
-               flags |= SWITCH_FOPEN_READ;
        }
 
        handle->samples = 0;
@@ -289,9 +336,9 @@ static switch_status_t switch_opusfile_open(switch_file_handle_t *handle, const
                        ogg_int64_t duration;
                        opus_int64  size;
                        duration = op_pcm_total(context->of, context->li);
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO , "[OGG/OPUS File] Duration (samples): %u", (unsigned int)duration);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO , "[OGG/OPUS File] Duration (samples): %u\n", (unsigned int)duration);
                        size = op_raw_total(context->of, context->li);
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"[OGG/OPUS File] Size (bytes): %u", (unsigned int)size);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"[OGG/OPUS File] Size (bytes): %u\n", (unsigned int)size);
                }
                tags = op_tags(context->of, context->li);
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS File] Encoded by: %s\n", tags->vendor);
@@ -343,7 +390,7 @@ static switch_status_t switch_opusfile_seek(switch_file_handle_t *handle, unsign
                switch_buffer_zero(context->audio_buffer);
                ret = op_pcm_seek(context->of, samples);
                if (globals.debug) {
-                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"[OGG/OPUS File] seek samples: [%u]", (unsigned int)samples);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"[OGG/OPUS File] seek samples: [%u]\n", (unsigned int)samples);
                }
                if (ret == 0) {
                        handle->pos = *cur_sample = samples;
@@ -416,7 +463,7 @@ static switch_status_t switch_opusfile_write(switch_file_handle_t *handle, void
        int err;
        int mapping_family = 0;
 
-       opus_file_context *context = handle->private_info;
+       opus_file_context *context;
 
        if (!handle) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Error no handle\n");
@@ -443,17 +490,19 @@ static switch_status_t switch_opusfile_write(switch_file_handle_t *handle, void
        }
 
        if (globals.debug) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"[OGG/OPUS File] write nsamples: [%d]", (int)nsamples);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,"[OGG/OPUS File] write nsamples: [%d]\n", (int)nsamples);
        }
 
        err = ope_encoder_write(context->enc, (opus_int16 *)data, nsamples);
 
        if (err != OPE_OK) {
-               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS File] Can't encode. err: [%d] [%s]", err, ope_strerror(err));
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS File] Can't encode. err: [%d] [%s]\n", err, ope_strerror(err));
                return SWITCH_STATUS_FALSE;
        }
 
        handle->sample_count += *len;
+#else
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[OGG/OPUS File] Encoding support not built-in, build the module with libopusenc!\n");
 #endif 
        return SWITCH_STATUS_SUCCESS;
 }
@@ -479,7 +528,7 @@ SWITCH_STANDARD_API(mod_opusfile_debug)
                        globals.debug = 1;
                        stream->write_function(stream, "OPUSFILE Debug: on\n");
 #ifdef HAVE_OPUSFILE_ENCODE
-                       stream->write_function(stream, "Library version (encoding): %s\n", ope_get_version_string());
+                       stream->write_function(stream, "Library version (encoding): %s ABI: %s\n", ope_get_version_string(), ope_get_abi_version());
 #endif 
                } else if (!strcasecmp(cmd, "off")) {
                        globals.debug = 0;
@@ -491,6 +540,603 @@ SWITCH_STANDARD_API(mod_opusfile_debug)
        return SWITCH_STATUS_SUCCESS;
 }
 
+static switch_status_t switch_opusstream_set_initial(opus_stream_context_t *context) 
+{
+       /* https://www.opus-codec.org/docs/opusfile_api-0.5/group__stream__info.html#ga9272a4a6ac9e01fbc549008f5ff58b4c */
+
+       if (context->of) {
+               int ret;
+               /* docs: "Obtain the PCM offset of the next sample to be read. " */
+               ret = op_pcm_tell(context->of);
+               if (ret != OP_EINVAL) {
+                       context->pcm_offset = ret;
+               }
+               context->pcm_print_offset = context->pcm_offset - context->samplerate;
+
+               /* docs: "Obtain the current value of the position indicator for _of." */
+               ret = op_raw_tell(context->of);
+               if (ret != OP_EINVAL) {
+                       context->raw_offset = ret;
+               }
+
+               /* docs: "Get the channel count of the given link in a (possibly-chained) Ogg Opus stream. " */
+               context->dec_channels = op_channel_count(context->of, -1);
+               if (context->dec_channels == 0) {
+                       context->dec_channels = 1;
+               }
+
+               context->samplerate = DEFAULT_RATE;
+               return SWITCH_STATUS_SUCCESS;
+       }
+
+       return SWITCH_STATUS_FALSE;
+}
+
+static switch_status_t switch_opusstream_stream_info(opus_stream_context_t *context) 
+{
+       const OpusHead *head;
+       const OpusTags *tags;
+       opus_int32 bitrate;
+
+       if (context->of) {
+
+               /* docs: "Get the serial number of the given link in a (possibly-chained) Ogg Opus stream. "*/
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Decode] SerialNO: [%u]\n", op_serialno(context->of, -1));
+               bitrate = op_bitrate_instant(context->of);
+               if (bitrate > 0) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Decode] Bitrate: [%d]\n", bitrate);
+               }
+
+               if(context->pcm_offset!=0){
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] Non-zero starting PCM offset: [%li]\n", 
+                                       (long)context->pcm_offset);
+               }
+
+               /* docs: "Retrieve the index of the current link." */
+               context->li = op_current_link(context->of);
+
+               /* docs: "Get the ID header information for the given link in a (possibly chained) Ogg Opus stream. " */
+               head = op_head(context->of, context->li);
+               if (head) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Decode] Channels: [%i]\n", head->channel_count);
+                       if (head->input_sample_rate) {
+                               context->samplerate = head->input_sample_rate;
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Decode] Original sampling rate: [%lu] Hz\n", 
+                                               (unsigned long)head->input_sample_rate);
+                       }
+               }
+               /*docs: "Returns whether or not the data source being read is seekable."*/
+               if (op_seekable(context->of)) {
+                       opus_int64  size;
+                       context->duration = op_pcm_total(context->of, context->li); // page duration 
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO , "[OGG/OPUS Stream Decode] Duration (samples): [%u]\n", (unsigned int)context->duration);
+                       size = op_raw_total(context->of, context->li);
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,"[OGG/OPUS Stream Decode] Size (bytes): [%u]\n", (unsigned int)size);
+               }
+               /* docs: "Get the comment header information for the given link in a (possibly chained) Ogg Opus stream." */
+               tags = op_tags(context->of, context->li);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Decode] Encoded by: [%s]\n", tags->vendor);
+               return SWITCH_STATUS_FALSE;
+       }
+       return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t switch_opusstream_stream_decode(opus_stream_context_t *context, void *data, int channels)
+{
+       int ret;
+       size_t buf_inuse;
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+
+       if (!context->of) {
+               return SWITCH_STATUS_FALSE;
+       }
+       memset(context->decode_buf, 0, sizeof(context->decode_buf));
+       switch_mutex_lock(context->audio_mutex);
+       while (!(context->eof)) {
+
+               if (channels == 1) {
+                       ret = op_read(context->of, (opus_int16 *)context->decode_buf, OPUS_MAX_PCM, NULL);
+               } else if (channels > 1) {
+                       ret = op_read_stereo(context->of, (opus_int16 *)context->decode_buf, OPUS_MAX_PCM);
+               } else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream] Invalid number of channels!\n");
+                               switch_goto_status(SWITCH_STATUS_FALSE, end);
+               }
+
+               if (ret < 0) {
+                       switch(ret) {
+                       case OP_HOLE:   /* There was a hole in the data, and some samples may have been skipped. Call this function again to continue decoding past the hole.*/
+                               if (!context->dec_page_ready) {
+                                       if (globals.debug) {
+                                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Decoder]: incomplete ogg page, will retry\n");
+                                       }
+                                       switch_goto_status(SWITCH_STATUS_SUCCESS, end);
+                               }
+                       case OP_EREAD:  /*An underlying read operation failed. This may signal a truncation attack from an <https:> source.*/
+                       
+                       case OP_EFAULT: /*      An internal memory allocation failed. */
+
+                       case OP_EIMPL:  /*An unseekable stream encountered a new link that used a feature that is not implemented, such as an unsupported channel family.*/
+
+                       case OP_EINVAL: /* The stream was only partially open. */
+
+                       case OP_ENOTFORMAT: /*  An unseekable stream encountered a new link that did not have any logical Opus streams in it. */
+
+                       case OP_EBADHEADER:     /*An unseekable stream encountered a new link with a required header packet that was not properly formatted, contained illegal values, or was missing altogether.*/
+
+                       case OP_EVERSION:       /*An unseekable stream encountered a new link with an ID header that contained an unrecognized version number.*/
+
+                       case OP_EBADPACKET: /*Failed to properly decode the next packet.*/
+
+                       case OP_EBADLINK:               /*We failed to find data we had seen before.*/
+
+                       case OP_EBADTIMESTAMP:          /*An unseekable stream encountered a new link with a starting timestamp that failed basic validity checks.*/
+
+                       default:
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Decoder]: error decoding stream: [%d]\n", ret);
+                               switch_goto_status(SWITCH_STATUS_FALSE, end);
+                       }
+               } else if (ret == 0) {
+                       /*The number of samples returned may be 0 if the buffer was too small to store even a single sample for both channels, or if end-of-file was reached*/
+                       if (globals.debug) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Decoder]: EOF reached [%d]\n", ret);
+                       }
+
+                       context->eof = TRUE;
+                       break;
+               } else /* (ret > 0)*/ {
+                       /*The number of samples read per channel on success*/
+                       switch_buffer_write(context->audio_buffer, (opus_int16 *)context->decode_buf, ret * sizeof(opus_int16) * channels);
+                       buf_inuse = switch_buffer_inuse(context->audio_buffer);
+
+                       if (globals.debug) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, 
+                                               "[OGG/OPUS Decoder]: Read samples: %d. Wrote bytes to buffer: [%d] bytes in use: [%u] byte pos stream: [%lu]\n", 
+                                               ret, (int)(ret * sizeof(int16_t) * channels), (unsigned int)buf_inuse, (long unsigned int)op_raw_tell(context->of));
+                       }
+               }
+       }
+
+end:
+       context->eof = FALSE; // for next page 
+
+       switch_mutex_unlock(context->audio_mutex);
+
+       return status;
+}
+
+static switch_status_t switch_opusstream_init(switch_codec_t *codec, switch_codec_flag_t flags, const switch_codec_settings_t *codec_settings)
+{
+       struct opus_stream_context *context = NULL;
+       int encoding, decoding;
+#ifdef HAVE_OPUSFILE_ENCODE
+       int err;
+#endif 
+
+       encoding = (flags & SWITCH_CODEC_FLAG_ENCODE);
+       decoding = (flags & SWITCH_CODEC_FLAG_DECODE);
+
+       if (!(encoding || decoding) || (!(context = switch_core_alloc(codec->memory_pool, sizeof(struct opus_stream_context))))) {
+               return SWITCH_STATUS_FALSE;
+       } else {
+
+               memset(context, 0, sizeof(struct opus_stream_context));
+               codec->private_info = context;
+               context->pool = codec->memory_pool;
+
+               switch_thread_rwlock_create(&(context->rwlock), context->pool);
+
+               switch_thread_rwlock_rdlock(context->rwlock);
+
+               switch_mutex_init(&context->audio_mutex, SWITCH_MUTEX_NESTED, context->pool);
+               switch_mutex_init(&context->ogg_mutex, SWITCH_MUTEX_NESTED, context->pool);
+
+               if (switch_buffer_create_dynamic(&context->audio_buffer, TC_BUFFER_SIZE, TC_BUFFER_SIZE * 2, 0) != SWITCH_STATUS_SUCCESS) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
+                       switch_thread_rwlock_unlock(context->rwlock);
+                       return SWITCH_STATUS_MEMERR;
+               }
+
+               if (switch_buffer_create_dynamic(&context->ogg_buffer, TC_BUFFER_SIZE, TC_BUFFER_SIZE * 2, 0) != SWITCH_STATUS_SUCCESS) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Memory Error!\n");
+                       switch_thread_rwlock_unlock(context->rwlock);
+                       return SWITCH_STATUS_MEMERR;
+               }
+
+               context->samplerate = codec->implementation->actual_samples_per_second;
+               context->frame_size = codec->implementation->actual_samples_per_second * (codec->implementation->microseconds_per_packet / 1000) / 1000;
+
+               if (globals.debug) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream] frame_size: [%d]\n", (int)context->frame_size);
+               }
+#ifdef HAVE_OPUSFILE_ENCODE
+               if (encoding) {
+                       if (!context->comments) {
+                               context->comments = ope_comments_create();
+                               ope_comments_add(context->comments, "METADATA", "Freeswitch/mod_opusfile");
+                       }
+                       if (!context->enc) {
+                               int mapping_family = 0;
+                               // opus_multistream_surround_encoder_get_size() in libopus will check these
+                               if ((context->enc_channels > 2) && (context->enc_channels <= 8)) {
+                                       mapping_family = 1;
+                               } else if ((context->enc_channels > 8) && (context->enc_channels <= 255)) {
+                                       // multichannel/multistream mapping family . https://people.xiph.org/~giles/2013/draft-ietf-codec-oggopus.html#rfc.section.5.1.1
+                                       mapping_family = 255;
+                               }
+                               context->enc = ope_encoder_create_pull(context->comments, !context->samplerate?DEFAULT_RATE:context->samplerate, !context->enc_channels?1:context->enc_channels, mapping_family, &err);
+
+                               if (!context->enc) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream Encode] Can't create stream. err: [%d] [%s]\n", err, ope_strerror(err));
+                                       switch_thread_rwlock_unlock(context->rwlock);
+                                       return SWITCH_STATUS_FALSE;
+                               } else {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Encode] Stream opened for encoding\n"); 
+                               }
+                               ope_encoder_ctl(context->enc, OPUS_SET_COMPLEXITY_REQUEST, 5);
+                               ope_encoder_ctl(context->enc, OPUS_SET_APPLICATION_REQUEST, OPUS_APPLICATION_VOIP);
+                       }
+               }
+#endif 
+               switch_thread_rwlock_unlock(context->rwlock);
+               return SWITCH_STATUS_SUCCESS;
+       }
+}
+
+static switch_status_t switch_opusstream_destroy(switch_codec_t *codec)
+{
+       struct opus_stream_context *context = codec->private_info;
+       switch_status_t st;
+       
+       switch_thread_rwlock_rdlock(context->rwlock);
+       
+       if (context->read_stream_thread) {
+               switch_thread_join(&st, context->read_stream_thread);
+               if (st == SWITCH_STATUS_SUCCESS) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode/Decode] Joined decoding thread\n");
+               } else {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode/Decode] Can't join decoding thread\n");
+               }
+       }
+
+       if (context->of) {
+               op_free(context->of);
+       }
+
+#ifdef HAVE_OPUSFILE_ENCODE
+       if (context->enc) {
+               ope_encoder_destroy(context->enc);
+       }
+       if (context->comments) {
+               ope_comments_destroy(context->comments);
+       }
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode/Decode] Encoded pages: [%u]\n", context->enc_pagecount);
+#endif 
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode/Decode] Decoded chunks: [%u]\n", context->dec_count);
+       if (context->audio_buffer) {
+               switch_buffer_destroy(&context->audio_buffer);
+       }
+       if (context->ogg_buffer) {
+               switch_buffer_destroy(&context->ogg_buffer);
+       }
+       switch_thread_rwlock_unlock(context->rwlock);
+       codec->private_info = NULL;
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode/Decode] Stopped processing\n");
+       return SWITCH_STATUS_SUCCESS;
+}
+
+static switch_status_t switch_opusstream_encode(switch_codec_t *codec,
+                                                                               switch_codec_t *other_codec,
+                                                                               void *decoded_data,
+                                                                               uint32_t decoded_data_len,
+                                                                               uint32_t decoded_rate,
+                                                                               void *encoded_data, 
+                                                                               uint32_t *encoded_data_len,
+                                                                               uint32_t *encoded_rate,
+                                                                               unsigned int *flag)
+{
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+#ifdef HAVE_OPUSFILE_ENCODE
+       struct opus_stream_context *context = codec->private_info;
+       size_t nsamples = (int)decoded_data_len / sizeof(int16_t);
+       int err, ret;
+       int len = 0; int thres;
+       unsigned char *decode_buf = decoded_data;
+
+       if (!context) {
+               return SWITCH_STATUS_FALSE;
+       }
+
+       globals.debug = 0;
+       switch_thread_rwlock_rdlock(context->rwlock);
+
+       if (globals.debug) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
+                               "[OGG/OPUS Stream Encode] : switch_opusfile_stream_encode() decoded_data [%x][%x][%x][%x] nsamples: [%d]\n", 
+                               decode_buf[0], decode_buf[1], decode_buf[2], decode_buf[3], (int)nsamples);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode] stream write nsamples: [%d]\n", (int)nsamples);
+       }
+       if (context->enc_channels == 0) {
+               context->enc_channels = 1;
+       }
+       if (!context->samplerate) {
+               context->samplerate = DEFAULT_RATE;
+       }
+
+       if (context->enc) {
+               // we reach here every 20 ms.
+               // decoded_data - this can be an interleaved buffer, to do multistream. we’ll need the exact number of channels too.
+               err = ope_encoder_write(context->enc, (opus_int16 *)decoded_data, nsamples / context->enc_channels);
+               if (err != OPE_OK) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream Encode] can't encode, ret: [%d] [%s]\n", err, ope_strerror(err));
+                       switch_goto_status(SWITCH_STATUS_FALSE, end);
+               }
+               context->samples_encode += nsamples;
+       }
+
+       thres = context->samplerate/PAGES_PER_SEC;
+
+       if (!(context->samples_encode % thres) && context->samples_encode > context->samplerate) {
+               if (context->enc) {
+                       unsigned char *vb = context->encode_buf;
+                       int req_flush = 1; 
+                       /* OPE_EXPORT int ope_encoder_get_page(OggOpusEnc *enc, unsigned char **page, opus_int32 *len, int flush); */
+                       ret = ope_encoder_get_page(context->enc, &vb, &len, req_flush);
+                       if (ret == 0) {
+                               /* ope_encoder_get_page(): ret is 1 if there is a page available, 0 if not. */
+                               if (globals.debug) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode] can't retrieve encoded page, page not ready. ret: [%d]\n", ret);
+                               }
+                               switch_goto_status(SWITCH_STATUS_SUCCESS, end);
+                       } else {
+                               if (globals.debug) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Encode] retrieved page from encoder. ret [%d] len: [%d] [%p]\n", 
+                                                       ret, len, context->encode_buf);
+                               }
+                               if (len > OGG_MAX_PAGE_SIZE) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream Encode] retrieved page bigger than ogg max size!\n");
+                                       switch_goto_status(SWITCH_STATUS_FALSE, end);
+                               }
+                               memcpy(encoded_data, vb, len);
+                               *encoded_data_len = len;
+                               context->enc_pagecount++;
+                               switch_thread_rwlock_unlock(context->rwlock);
+                               return SWITCH_STATUS_SUCCESS;
+                       }
+               } else {
+                       switch_goto_status(SWITCH_STATUS_FALSE, end);
+               }
+       }
+end: 
+       *encoded_data_len = 0;
+       switch_thread_rwlock_unlock(context->rwlock);
+#else
+       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[OGG/OPUS Stream Encode] Encoding support not built-in, build the module with libopusenc!\n");
+#endif 
+       return status;
+}
+
+// decode_stream_cb(): nbytes is OP_READ_SIZE (builtin limit - libopusfile).
+// this is being called by op_read() or op_read_stereo() - we’re giving chunks of pages to be decoded. 
+static int decode_stream_cb(void *dcontext, unsigned char *data, int nbytes) 
+{
+       opus_stream_context_t *context = (opus_stream_context_t *)dcontext;
+       unsigned int ret = 0;
+       
+       if (!context) {
+               return 0;
+       }
+
+       if (globals.debug) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] decode CB called: context: %p data: %p packet_len: %d\n", 
+                               (void *)context, data, nbytes);
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] decode_stream_cb(): switch_thread_self(): %lx\n",  switch_thread_self());
+       }
+
+       switch_mutex_lock(context->ogg_mutex);
+       ret = switch_buffer_read(context->ogg_buffer, context->ogg_data, nbytes);
+       if (ret == 0) {
+               data = NULL;
+               switch_mutex_unlock(context->ogg_mutex);
+               if (globals.debug) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] No data. Wanted: [%d] bytes\n", nbytes);
+               }
+               return ret;
+       }
+       context->dec_count++;
+       memcpy(data, context->ogg_data, ret);
+
+       if (switch_buffer_inuse(context->ogg_buffer)) {
+               context->dec_page_ready = 0;
+       } else {
+               context->dec_page_ready = 1;
+               if (globals.debug) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] buffer is empty, all pages passed to the decoder\n");
+               }
+
+       }
+       switch_mutex_unlock(context->ogg_mutex);
+
+       if (globals.debug) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] decode_stream_cb(): ret: %u\n",  ret);
+       }
+       return ret;
+}
+
+const OpusFileCallbacks cb={decode_stream_cb, NULL, NULL, NULL};
+
+static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void *obj)
+{
+       opus_stream_context_t *context = (opus_stream_context_t *) obj;
+       int err = 0;
+       OggOpusFile *temp_of = NULL;
+       int buffered_ogg_bytes;
+
+       if (globals.debug) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] read_stream_thread(): switch_thread_self(): 0x%lx\n",  switch_thread_self());
+       }
+       switch_thread_rwlock_rdlock(context->rwlock);
+
+       if ((buffered_ogg_bytes = switch_buffer_inuse(context->ogg_buffer))) {
+               if (buffered_ogg_bytes <= OGG_MAX_PAGE_SIZE) {
+                       switch_buffer_peek(context->ogg_buffer, context->ogg_data, buffered_ogg_bytes);
+                       context->ogg_data_len = buffered_ogg_bytes;
+               }
+       } 
+
+       /* https://mf4.xiph.org/jenkins/view/opus/job/opusfile-unix/ws/doc/html/group__stream__open__close.html#gad183ecf5fbec5add3a5ccf1e3b1d2593  */
+       /* docs: "Open a stream using the given set of callbacks to access it." */
+       temp_of = op_open_callbacks(context, &cb, (const unsigned char *)context->ogg_data, context->ogg_data_len, &err);
+       if (temp_of && (err == 0)) {
+               context->dec_page_ready = 1; 
+               context->of = temp_of;
+               if (globals.debug) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[OGG/OPUS Stream Decode] Opened stream, installed decoding callback!\n");
+               }
+               switch_opusstream_set_initial(context);
+               switch_opusstream_stream_info(context);
+       } else if (err != 0) {
+               switch (err) {
+                       case OP_EREAD:
+                          //  An underlying read, seek, or tell operation failed when it should have succeeded, or we failed to find data in the stream we had seen before. 
+                       case OP_EFAULT:
+                               //    There was a memory allocation failure, or an internal library error. 
+                       case OP_EIMPL:
+                               // The stream used a feature that is not implemented, such as an unsupported channel family. 
+                       case OP_EINVAL:
+                               // seek() was implemented and succeeded on this source, but tell() did not, or the starting position indicator was not equal to _initial_bytes. 
+                       case OP_ENOTFORMAT:
+                               // The stream contained a link that did not have any logical Opus streams in it. 
+                       case OP_EBADHEADER:
+                               // A required header packet was not properly formatted, contained illegal values, or was missing altogether. 
+                       case OP_EVERSION:
+                               // An ID header contained an unrecognized version number. 
+                       case OP_EBADLINK:
+                               // We failed to find data we had seen before after seeking. 
+                       case OP_EBADTIMESTAMP:
+                               // The first or last timestamp in a link failed basic validity checks
+                       default:
+                               context->dec_page_ready = 0;
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream Decode] error opening stream: [%d]\n", err);
+               }
+       }
+
+       switch_thread_rwlock_unlock(context->rwlock);
+       return NULL;
+}
+
+static void launch_read_stream_thread(opus_stream_context_t *context)
+{
+       switch_threadattr_t *thd_attr = NULL;
+
+       switch_threadattr_create(&thd_attr, context->pool);
+       switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+       switch_thread_create(&context->read_stream_thread, thd_attr, read_stream_thread, context, context->pool);
+}
+
+static switch_status_t switch_opusstream_decode(switch_codec_t *codec,
+                                                                                 switch_codec_t *other_codec,
+                                                                                 void *encoded_data,
+                                                                                 uint32_t encoded_data_len,
+                                                                                 uint32_t encoded_rate, 
+                                                                                 void *decoded_data, 
+                                                                                 uint32_t *decoded_data_len, 
+                                                                                 uint32_t *decoded_rate,
+                                                                                 unsigned int *flag)
+{
+       struct opus_stream_context *context = codec->private_info;
+       size_t bytes = 0; 
+       int ogg_bytes = OGG_MIN_PAGE_SIZE; // min page size before trying to open the incoming stream 
+       size_t rb = 0;
+       unsigned char *encode_buf = encoded_data;
+       size_t buffered_ogg_bytes = 0;
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+
+       if (!context) {
+               return SWITCH_STATUS_FALSE;
+       }
+       if (globals.debug) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
+                               "[OGG/OPUS Stream Decode] : switch_opusstream_decode() encoded_data [%x][%x][%x][%x] encoded_data_len: [%u]\n", 
+                               encode_buf[0], encode_buf[1], encode_buf[2], encode_buf[3], encoded_data_len);
+       }
+#ifdef LIMIT_DROP
+       if ((encoded_data_len <=  MIN_OGG_PAYLOAD) && (encoded_data_len > 0)) {
+               *decoded_data_len = 0;
+               if (globals.debug) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] switch_opusstream_decode(): drop [%u]", (unsigned int)encoded_data_len);
+               }
+               return SWITCH_STATUS_SUCCESS;
+       }
+#endif
+
+       switch_thread_rwlock_rdlock(context->rwlock);
+       memset(context->ogg_data, 0, sizeof(context->ogg_data)); 
+       if (encoded_data_len <= SWITCH_RECOMMENDED_BUFFER_SIZE) {
+               switch_mutex_lock(context->ogg_mutex);
+               switch_buffer_write(context->ogg_buffer, encode_buf, encoded_data_len);
+               if ((buffered_ogg_bytes = switch_buffer_inuse(context->ogg_buffer)) >= ogg_bytes) {
+                       if (globals.debug) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG,
+                                               "[OGG/OPUS Stream Decode] switch_opusstream_decode() encoded_data [%x][%x][%x][%x] encoded_data_len: %u buffered_ogg_bytes: [%u]\n", 
+                                               encode_buf[0], encode_buf[1], encode_buf[2], encode_buf[3], encoded_data_len, (unsigned int)buffered_ogg_bytes);
+                       }
+                       if (buffered_ogg_bytes <= OGG_MAX_PAGE_SIZE) {
+                               switch_buffer_peek(context->ogg_buffer, context->ogg_data, buffered_ogg_bytes);
+                               context->ogg_data_len = buffered_ogg_bytes;
+                       }       else {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream Decode] buffered ogg data bigger than max OGG page size, will flush\n");
+                               *decoded_data_len = 0;
+                               switch_buffer_zero(context->ogg_buffer);
+                               switch_mutex_unlock(context->ogg_mutex);
+                               switch_goto_status(SWITCH_STATUS_SUCCESS, end);
+                       }
+               }
+
+               switch_mutex_unlock(context->ogg_mutex);
+       } else {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[OGG/OPUS Stream Decode] too much data to buffer, flushing buffer!\n");
+               *decoded_data_len = 0;
+               switch_buffer_zero(context->ogg_buffer);
+               switch_goto_status(SWITCH_STATUS_SUCCESS, end);
+       }
+
+       if ((buffered_ogg_bytes >= ogg_bytes) && encoded_data_len) {
+
+               if (!(op_test(NULL, context->ogg_data, buffered_ogg_bytes))) {
+                       if (!context->read_stream && buffered_ogg_bytes > OGG_MIN_PAGE_SIZE) {
+                               if (globals.debug) {
+                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] launching decoding thread\n");
+                               }
+                               launch_read_stream_thread(context);
+                               context->read_stream = 1; // mark thread started
+                       }
+               } 
+       }
+       if (context->of) {
+               if (switch_opusstream_stream_decode(context, context->ogg_data, context->dec_channels) == SWITCH_STATUS_FALSE) {
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[OGG/OPUS Stream Decode] Cannot decode stream\n");
+                       *decoded_data_len = 0;
+                       switch_goto_status(SWITCH_STATUS_FALSE, end);
+               }
+       }
+       switch_mutex_lock(context->audio_mutex);
+       bytes = switch_buffer_inuse(context->audio_buffer);
+       rb = switch_buffer_read(context->audio_buffer, decoded_data, context->frame_size * sizeof(int16_t));
+       switch_mutex_unlock(context->audio_mutex);
+
+       if (globals.debug) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "[OGG/OPUS Stream Decode] rb (read from audio_buffer): [%d] bytes in audio buffer: [%d]\n", (int)rb, (int)bytes);
+       }
+
+       *decoded_data_len = rb ; // bytes
+end:
+
+       switch_thread_rwlock_unlock(context->rwlock);
+       return status;
+}
+
 /* Registration */
 
 static char *supported_formats[SWITCH_MAX_CODECS] = { 0 };
@@ -499,6 +1145,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_opusfile_load)
 {
        switch_file_interface_t *file_interface;
        switch_api_interface_t *commands_api_interface;
+       switch_codec_interface_t *codec_interface;
+       int mpf = 10000, spf = 80, bpf = 160, count = 2;
+       int RATES[] = {8000, 16000, 24000, 48000};
+       int i;
 
        supported_formats[0] = "opus";
 
@@ -522,13 +1172,48 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_opusfile_load)
        file_interface->file_set_string = switch_opusfile_set_string;
        file_interface->file_get_string = switch_opusfile_get_string;
 
-       
+       SWITCH_ADD_CODEC(codec_interface, "OPUSSTREAM");
+
+       for (i = 0; i < sizeof(RATES) / sizeof(RATES[0]); i++) {
+// mono
+               switch_core_codec_add_implementation(pool, codec_interface, SWITCH_CODEC_TYPE_AUDIO,
+                                                                                        98,     /* the IANA code number */ // does not matter
+                                                                                        "OPUSSTREAM",  /* the IANA code name */ // we just say OPUSSTREAM is an ogg/opus stream
+                                                                                        NULL,   /* default fmtp to send (can be overridden by the init function) */
+                                                                                        RATES[i], /* samples transferred per second */ // 48000 !
+                                                                                        RATES[i], /* actual samples transferred per second */
+                                                                                        16 * RATES[i] / 8000, /* bits transferred per second */
+                                                                                        mpf * count,  /* number of microseconds per frame */
+                                                                                        spf * RATES[i] / 8000, /* number of samples per frame */
+                                                                                        bpf * RATES[i] / 8000, /* number of bytes per frame decompressed */
+                                                                                        0,     /* number of bytes per frame compressed */
+                                                                                        1, /* number of channels represented */
+                                                                                        1,     /* number of frames per network packet */
+                                                                                       switch_opusstream_init, switch_opusstream_encode, switch_opusstream_decode, switch_opusstream_destroy);
+// stereo
+               switch_core_codec_add_implementation(pool, codec_interface, SWITCH_CODEC_TYPE_AUDIO,
+                                                                                        98,     /* the IANA code number */ // does not matter
+                                                                                        "OPUSSTREAM",  /* the IANA code name */ // we just say OPUSSTREAM is an ogg/opus stream
+                                                                                        NULL,   /* default fmtp to send (can be overridden by the init function) */
+                                                                                        RATES[i], /* samples transferred per second */
+                                                                                        RATES[i], /* actual samples transferred per second */
+                                                                                        16 * RATES[i] / 8000 * 2, /* bits transferred per second */
+                                                                                        mpf * count,  /* number of microseconds per frame */
+                                                                                        spf * RATES[i] / 8000 * 2, /* number of samples per frame */
+                                                                                        bpf * RATES[i] / 8000 * 2, /* number of bytes per frame decompressed */
+                                                                                        0,     /* number of bytes per frame compressed */
+                                                                                        2, /* number of channels represented */
+                                                                                        1,     /* number of frames per network packet */
+                                                                                       switch_opusstream_init, switch_opusstream_encode, switch_opusstream_decode, switch_opusstream_destroy);
+       }
+
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "mod_opusfile loaded\n");
 
        /* indicate that the module should continue to be loaded */
        return SWITCH_STATUS_SUCCESS;
 }
 
+
 /* For Emacs:
  * Local Variables:
  * mode:c
index 80591e86ac978b6c070d737fd5f35df61215a5d7..c1fa064524447191c8788cb35597f7e1d70689ab 100644 (file)
@@ -5,6 +5,7 @@
       <modules>
         <load module="mod_loopback"/>
         <load module="mod_opusfile"/>
+        <load module="mod_sndfile"/>
       </modules>
     </configuration>
   </section>
diff --git a/src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.opus b/src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.opus
new file mode 100644 (file)
index 0000000..e18be2e
Binary files /dev/null and b/src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.opus differ
diff --git a/src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.wav b/src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.wav
new file mode 100644 (file)
index 0000000..98d4181
Binary files /dev/null and b/src/mod/formats/mod_opusfile/test/sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.wav differ
diff --git a/src/mod/formats/mod_opusfile/test/sounds/opusfile-test-ogg.bitstream b/src/mod/formats/mod_opusfile/test/sounds/opusfile-test-ogg.bitstream
new file mode 100644 (file)
index 0000000..65c3889
Binary files /dev/null and b/src/mod/formats/mod_opusfile/test/sounds/opusfile-test-ogg.bitstream differ
index 0cb5e8b00bef51b1617d4e489f968cd0948cc007..f21738bdb72aa85c4552a275e2cea917fa60ba2e 100644 (file)
 #include <stdlib.h>
 
 #include <test/switch_test.h>
+#include <libteletone_detect.h>
+#include <libteletone.h>
 
+//#undef HAVE_OPUSFILE_ENCODE
+
+#define OGG_MIN_PAGE_SIZE 2400
+
+static switch_status_t test_detect_tone_in_file(const char *filepath, int rate, int freq) {
+       teletone_multi_tone_t mt;
+       teletone_tone_map_t map;
+       int16_t data[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 };
+       size_t len = rate * 2 / 100; // in samples
+       switch_status_t status;
+       switch_file_handle_t fh = { 0 };
+
+       status = switch_core_file_open(&fh, filepath, 1, rate, SWITCH_FILE_FLAG_READ | SWITCH_FILE_DATA_SHORT, NULL);
+       if (status != SWITCH_STATUS_SUCCESS) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Cannot open file [%s]\n", filepath);
+               return SWITCH_STATUS_FALSE;
+       } 
+
+       mt.sample_rate = rate;
+       map.freqs[0] = (teletone_process_t)freq;
+
+       teletone_multi_tone_init(&mt, &map);
+
+       while (switch_core_file_read(&fh, &data, &len) == SWITCH_STATUS_SUCCESS) {
+               if (teletone_multi_tone_detect(&mt, data, len)) {
+                       switch_core_file_close(&fh);
+                       return SWITCH_STATUS_SUCCESS;
+               }
+       }
+
+       switch_core_file_close(&fh);
+       return SWITCH_STATUS_FALSE;
+}
 
 FST_CORE_BEGIN(".")
 {
@@ -42,6 +77,7 @@ FST_CORE_BEGIN(".")
                {
                        fst_requires_module("mod_loopback");
                        fst_requires_module("mod_opusfile");
+                       fst_requires_module("mod_sndfile");
                }
                FST_SETUP_END()
 
@@ -115,6 +151,163 @@ FST_CORE_BEGIN(".")
 
                        switch_sleep(1000000);
                }
+               FST_TEST_END()
+
+               FST_TEST_BEGIN(opusfile_stream) 
+               {
+                       switch_codec_t read_codec = { 0 };
+                       switch_status_t status;
+                       switch_codec_settings_t codec_settings = {{ 0 }};
+                       unsigned char buf[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 };
+                       switch_file_handle_t fhw = { 0 };
+                       uint32_t flags = 0;
+                       uint32_t rate;
+                       static char tmp_filename[] = "/tmp/opusfile-stream-unit_test.wav";
+                       char path[4096];
+                       uint32_t filerate = 48000;
+                       uint32_t torate = 8000;
+                       /*decode*/
+                       uint32_t decoded_len;
+                       size_t write_len;
+                       unsigned char decbuf[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 };
+#ifdef  HAVE_OPUSFILE_ENCODE
+                       switch_file_handle_t fh = { 0 };
+                       unsigned char encbuf[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 };
+                       switch_codec_t write_codec = { 0 };
+                       switch_size_t len = 960;
+                       uint32_t encoded_len;
+                       unsigned int pages = 0;
+                       /*
+                       General
+                       Complete name                            : sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.wav
+                       Format                                   : Wave
+                       File size                                : 563 KiB
+                       Duration                                 : 6 s 0 ms
+                       Overall bit rate mode                    : Constant
+                       Overall bit rate                         : 768 kb/s
+
+                       Audio
+                       Format                                   : PCM
+                       Format settings, Endianness              : Little
+                       Format settings, Sign                    : Signed
+                       Codec ID                                 : 1
+                       Duration                                 : 6 s 0 ms
+                       Bit rate mode                            : Constant
+                       Bit rate                                 : 768 kb/s
+                       Channel(s)                               : 1 channel
+                       Sampling rate                            : 48.0 kHz
+                       Bit depth                                : 16 bits
+                       Stream size                              : 563 KiB (100%)
+                       */
+                       static char filename[] = "sounds/audiocheck.net_sin_1000Hz_-3dBFS_6s.wav";
+#else 
+                       static char opus_filename[] = "sounds/opusfile-test-ogg.bitstream";
+                       switch_file_t *fd;
+                       switch_size_t flen;
+#endif
+
+                       status = switch_core_codec_init(&read_codec,
+                       "OPUSSTREAM",
+                       "mod_opusfile",
+                       NULL,
+                       filerate,
+                       20,
+                       1, SWITCH_CODEC_FLAG_ENCODE | SWITCH_CODEC_FLAG_DECODE,
+                       &codec_settings, fst_pool);
+                       fst_check(status == SWITCH_STATUS_SUCCESS);
+
+#ifdef HAVE_OPUSFILE_ENCODE
+                       status = switch_core_codec_init(&write_codec,
+                       "OPUSSTREAM",
+                       "mod_opusfile",
+                       NULL,
+                       filerate,
+                       20,
+                       1, SWITCH_CODEC_FLAG_ENCODE | SWITCH_CODEC_FLAG_DECODE,
+                       &codec_settings, fst_pool);
+                       fst_check(status == SWITCH_STATUS_SUCCESS);
+
+                       sprintf(path, "%s%s%s", SWITCH_GLOBAL_dirs.conf_dir, SWITCH_PATH_SEPARATOR, filename);
+
+                       status = switch_core_file_open(&fh, path, 1, filerate, SWITCH_FILE_FLAG_READ | SWITCH_FILE_DATA_SHORT, NULL);
+                       fst_requires(status == SWITCH_STATUS_SUCCESS);
+
+                       status = switch_core_file_open(&fhw, tmp_filename, 1, torate, SWITCH_FILE_FLAG_WRITE | SWITCH_FILE_DATA_SHORT, NULL);
+                       fst_requires(status == SWITCH_STATUS_SUCCESS);
+
+                       fhw.native_rate = filerate; // make sure we resample to 8000 Hz, because teletone wants this rate
+
+                       while (switch_core_file_read(&fh, &buf, &len) == SWITCH_STATUS_SUCCESS) {
+
+                               status = switch_core_codec_encode(&write_codec, NULL, &buf, len * sizeof(int16_t), filerate, &encbuf, &encoded_len, &rate, &flags);
+                               fst_check(status == SWITCH_STATUS_SUCCESS);
+
+                               if (encoded_len) {
+                                       pages++;
+                                       status = switch_core_codec_decode(&read_codec, NULL, &encbuf, encoded_len, filerate, &decbuf, &decoded_len, &rate, &flags);
+                                       fst_check(status == SWITCH_STATUS_SUCCESS);
+                                       write_len = decoded_len / sizeof(int16_t);
+                                       if (write_len) switch_core_file_write(&fhw, &decbuf, &write_len);
+                               }
+                       }
+
+                       // continue reading, encoded pages are buffered
+                       while (switch_core_codec_decode(&read_codec, NULL, &encbuf, 0, filerate, &decbuf, &decoded_len, &rate, &flags) == SWITCH_STATUS_SUCCESS && decoded_len) {
+                               write_len = decoded_len / sizeof(int16_t);
+                               status = switch_core_file_write(&fhw, &decbuf, &write_len);
+                               fst_check(status == SWITCH_STATUS_SUCCESS);
+                       }
+
+                       switch_core_codec_destroy(&write_codec);
+
+                       status = switch_core_file_close(&fh);
+                       fst_check(status == SWITCH_STATUS_SUCCESS);
+
+#else  
+                       // the test will perform only decoding
+
+                       sprintf(path, "%s%s%s", SWITCH_GLOBAL_dirs.conf_dir, SWITCH_PATH_SEPARATOR, opus_filename);
+
+                       // open the file raw and buffer data to the decoder
+                       status = switch_file_open(&fd, path, SWITCH_FOPEN_READ, SWITCH_FPROT_UREAD, fst_pool);
+                       fst_requires(status == SWITCH_STATUS_SUCCESS);
+
+                       status = switch_core_file_open(&fhw, tmp_filename, 1, torate, SWITCH_FILE_FLAG_WRITE | SWITCH_FILE_DATA_SHORT, NULL);
+                       fst_requires(status == SWITCH_STATUS_SUCCESS);
+
+                       fhw.native_rate = filerate;
+
+                       flen = OGG_MIN_PAGE_SIZE;
+                       while (switch_file_read(fd, &buf, &flen) == SWITCH_STATUS_SUCCESS || flen != 0) {
+                               status = SWITCH_STATUS_SUCCESS;
+                               while (status == SWITCH_STATUS_SUCCESS) {
+                                       status = switch_core_codec_decode(&read_codec, NULL, &buf, flen, filerate, &decbuf, &decoded_len, &rate, &flags);
+                                       fst_requires(status == SWITCH_STATUS_SUCCESS);
+                                       write_len = decoded_len / sizeof(int16_t);
+                                       if (write_len) switch_core_file_write(&fhw, &decbuf, &write_len);
+                                       else break;
+                               }
+                       }
+#endif 
+
+                       // continue reading, encoded pages are buffered
+                       while (switch_core_codec_decode(&read_codec, NULL, &buf, 0, filerate, &decbuf, &decoded_len, &rate, &flags) == SWITCH_STATUS_SUCCESS && decoded_len) {
+                               write_len = decoded_len / sizeof(int16_t);
+                               status = switch_core_file_write(&fhw, &decbuf, &write_len);
+                               fst_check(status == SWITCH_STATUS_SUCCESS);
+                       }
+
+                       status = switch_core_file_close(&fhw);
+                       fst_check(status == SWITCH_STATUS_SUCCESS);
+
+                       switch_core_codec_destroy(&read_codec);
+
+                       // final test
+                       status = test_detect_tone_in_file(tmp_filename, torate, 1000 /*Hz*/);
+                       fst_requires(status == SWITCH_STATUS_SUCCESS);
+
+                       unlink(tmp_filename);
+               }
 
                FST_TEST_END()
        }