]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-7621
authorAnthony Minessale <anthm@freeswitch.org>
Mon, 8 Jun 2015 20:06:51 +0000 (15:06 -0500)
committerAnthony Minessale <anthm@freeswitch.org>
Mon, 8 Jun 2015 20:06:51 +0000 (15:06 -0500)
src/mod/formats/mod_shout/mod_shout.c

index a81336a79c01ecf5b8702e01bf94db98333d4a45..763ca5ec1b6358f88c35c268087a4e8a4878e17e 100644 (file)
@@ -117,7 +117,6 @@ struct shout_context {
        int dlen;
        FILE *fp;
        size_t samplerate;
-       uint8_t thread_running;
        uint8_t shout_init;
        uint32_t prebuf;
        int lame_ready;
@@ -128,6 +127,9 @@ struct shout_context {
        switch_size_t mp3buflen;
        switch_thread_rwlock_t *rwlock;
        int buffer_seconds;
+       switch_thread_t *read_stream_thread;
+       switch_thread_t *write_stream_thread;
+       curl_socket_t curlfd;
 };
 
 typedef struct shout_context shout_context_t;
@@ -137,6 +139,7 @@ static void decode_fd(shout_context_t *context, void *data, size_t bytes);
 static inline void free_context(shout_context_t *context)
 {
        size_t ret;
+       switch_status_t st;
 
        if (context) {
                switch_mutex_lock(context->audio_mutex);
@@ -144,18 +147,19 @@ static inline void free_context(shout_context_t *context)
                switch_mutex_unlock(context->audio_mutex);
 
                if (context->stream_url) {
-                       int sanity = 0;
-
-                       while (context->thread_running) {
-                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url);
-                               switch_yield(500000);
-                               if (++sanity > 10) {
-                                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Giving up waiting for stream to terminate: %s\n", context->stream_url);
-                                       break;
-                               }
+                       if (context->curlfd > -1) {
+                               shutdown(context->curlfd, 2);
+                       }
+                       switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url);
+                       if (context->read_stream_thread) {
+                               switch_thread_join(&st, context->read_stream_thread);
                        }
                }
 
+               if (context->write_stream_thread) {
+                       switch_thread_join(&st, context->write_stream_thread);
+               }
+
                switch_thread_rwlock_wrlock(context->rwlock);
 
                if (context->mh) {
@@ -368,6 +372,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
        uint32_t buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after first 64k buffer is dry */
        switch_size_t used;
 
+       if (context->err) {
+               goto error;
+       }
+
        if (!context->stream_channels) {
                long rate = 0;
                int channels = 0;
@@ -398,6 +406,10 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
                switch_yield(500000);
        }
 
+       if (context->err) {
+               goto error;
+       }
+
        if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) {
                goto error;
        }
@@ -442,6 +454,22 @@ static size_t stream_callback(void *ptr, size_t size, size_t nmemb, void *data)
        return 0;
 }
 
+static int progress_callback(void *clientp,   double dltotal,   double dlnow,   double ultotal,   double ulnow)
+{
+       shout_context_t *context = (shout_context_t *) clientp;
+       return context->err;
+}
+
+
+static int sockopt_callback(void *clientp, curl_socket_t curlfd,
+                                                       curlsocktype purpose)
+{
+       shout_context_t *context = (shout_context_t *) clientp;
+
+       context->curlfd = curlfd;
+       
+       return CURL_SOCKOPT_OK;
+}
 
 #define MY_BUF_LEN 1024 * 32
 #define MY_BLOCK_SIZE MY_BUF_LEN
@@ -452,9 +480,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
        shout_context_t *context = (shout_context_t *) obj;
 
        switch_thread_rwlock_rdlock(context->rwlock);
-
+       context->curlfd = -1;
        curl_handle = switch_curl_easy_init();
        switch_curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url);
+       curl_easy_setopt(curl_handle, CURLOPT_PROGRESSFUNCTION, progress_callback);
+       curl_easy_setopt(curl_handle, CURLOPT_PROGRESSDATA, (void *)context);
        switch_curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1);
        switch_curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 10);
        switch_curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, stream_callback);
@@ -465,7 +495,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
        switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, 100);     /* handle trickle connections */
        switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, 30);
        switch_curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, context->curl_error_buff);
+       curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
+       curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTDATA, (void *)context);
+
        cc = switch_curl_easy_perform(curl_handle);
+
        if (cc && cc != CURLE_WRITE_ERROR) {    /* write error is ok, we just exited from callback early */
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "CURL returned error:[%d] %s : %s [%s]\n", cc, switch_curl_easy_strerror(cc),
                                                  context->curl_error_buff, context->stream_url);
@@ -474,21 +508,17 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n");
 
        context->eof++;
-       context->thread_running = 0;
        switch_thread_rwlock_unlock(context->rwlock);
        return NULL;
 }
 
 static void launch_read_stream_thread(shout_context_t *context)
 {
-       switch_thread_t *thread;
        switch_threadattr_t *thd_attr = NULL;
 
-       context->thread_running = 1;
        switch_threadattr_create(&thd_attr, context->memory_pool);
-       switch_threadattr_detach_set(thd_attr, 1);
        switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-       switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool);
+       switch_thread_create(&context->read_stream_thread, thd_attr, read_stream_thread, context, context->memory_pool);
 }
 
 #define error_check() if (context->err) goto error;
@@ -499,20 +529,13 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi
 
        switch_thread_rwlock_rdlock(context->rwlock);
 
-       if (context->thread_running) {
-               context->thread_running++;
-       } else {
-               switch_thread_rwlock_unlock(context->rwlock);
-               return NULL;
-       }
-
        if (!context->lame_ready) {
                lame_init_params(context->gfp);
                lame_print_config(context->gfp);
                context->lame_ready = 1;
        }
 
-       while (!context->err && context->thread_running) {
+       while (!context->err) {
                unsigned char mp3buf[20480] = "";
                int16_t audio[9600] = { 0 };
                switch_size_t audio_read = 0;
@@ -575,31 +598,21 @@ static void *SWITCH_THREAD_FUNC write_stream_thread(switch_thread_t *thread, voi
   error:
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n");
        switch_thread_rwlock_unlock(context->rwlock);
-       context->thread_running = 0;
+
        return NULL;
 }
 
 static void launch_write_stream_thread(shout_context_t *context)
 {
-       switch_thread_t *thread;
        switch_threadattr_t *thd_attr = NULL;
-       int sanity = 10;
 
        if (context->err) {
                return;
        }
 
-       context->thread_running = 1;
        switch_threadattr_create(&thd_attr, context->memory_pool);
-       switch_threadattr_detach_set(thd_attr, 1);
        switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
-       switch_thread_create(&thread, thd_attr, write_stream_thread, context, context->memory_pool);
-
-       while (context->thread_running && context->thread_running != 2) {
-               switch_yield(100000);
-               if (!--sanity)
-                       break;
-       }
+       switch_thread_create(&context->write_stream_thread, thd_attr, write_stream_thread, context, context->memory_pool);
 }
 
 #define TC_BUFFER_SIZE 1024 * 32