]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-8811 #resolve [FS 1.7 crashes intermittently]
authorAnthony Minessale <anthm@freeswitch.org>
Wed, 17 Feb 2016 21:15:03 +0000 (15:15 -0600)
committerAnthony Minessale <anthm@freeswitch.org>
Wed, 17 Feb 2016 21:15:14 +0000 (15:15 -0600)
src/mod/formats/mod_local_stream/mod_local_stream.c
src/switch_core_file.c
src/switch_ivr_bridge.c

index 68fc8eb8ed953866c9ca3adb7ab6333686b6630f..eb465cdaf6e5e85b7d0938a32e73f956abc92799 100644 (file)
@@ -114,10 +114,28 @@ struct local_stream_source {
        switch_image_t *cover_art;
        char *banner_txt;
        int serno;
+       switch_size_t abuflen;
+       switch_byte_t *abuf;
 };
 
 typedef struct local_stream_source local_stream_source_t;
 
+local_stream_source_t *get_source(const char *path)
+{
+       local_stream_source_t *source = NULL;
+
+       switch_mutex_lock(globals.mutex);
+       if ((source = switch_core_hash_find(globals.source_hash, path))) {
+               if (!RUNNING || source->stopped || switch_thread_rwlock_tryrdlock(source->rwlock) != SWITCH_STATUS_SUCCESS) {
+                       source = NULL;
+               }
+       }
+       switch_mutex_unlock(globals.mutex);
+
+       return source;
+}
+
+
 switch_status_t list_streams_full(const char *line, const char *cursor, switch_console_callback_match_t **matches, switch_bool_t show_aliases)
 {
        local_stream_source_t *source;
@@ -183,9 +201,9 @@ static void flush_video_queue(switch_queue_t *q)
 
 static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void *obj)
 {
-       local_stream_source_t *source = obj;
+       volatile local_stream_source_t *s = (local_stream_source_t *) obj;
+       local_stream_source_t *source = (local_stream_source_t *) s;
        switch_file_handle_t fh = { 0 };
-       local_stream_context_t *cp;
        char file_buf[128] = "", path_buf[512] = "", last_path[512], png_buf[512] = "", tmp_buf[512] = "";
        switch_timer_t timer = { 0 };
        int fd = -1;
@@ -216,13 +234,13 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
        switch_thread_rwlock_create(&source->rwlock, source->pool);
 
        if (RUNNING) {
+               source->ready = 1;
                switch_mutex_lock(globals.mutex);
                switch_core_hash_insert(globals.source_hash, source->name, source);
                switch_mutex_unlock(globals.mutex);
-               source->ready = 1;
        }
 
-       while (RUNNING && !source->stopped) {
+       while (RUNNING && !source->stopped && source->ready) {
                const char *fname;
 
                if (temp_pool) {
@@ -258,7 +276,6 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
 
                while (RUNNING && !source->stopped) {
                        switch_size_t olen;
-                       uint8_t abuf[SWITCH_RECOMMENDED_BUFFER_SIZE] = { 0 };
                        const char *artist = NULL, *title = NULL;
 
                        if (fd > -1) {
@@ -452,11 +469,11 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
 
                                        if (use_fh == &source->chime_fh) {
                                                olen = source->samples;
-                                               switch_core_file_read(&fh, abuf, &olen);
+                                               switch_core_file_read(&fh, source->abuf, &olen);
                                                olen = source->samples;
                                        }
                                        
-                                       if (switch_core_file_read(use_fh, abuf, &olen) != SWITCH_STATUS_SUCCESS || !olen) {
+                                       if (switch_core_file_read(use_fh, source->abuf, &olen) != SWITCH_STATUS_SUCCESS || !olen) {
                                                switch_core_file_close(use_fh);
                                                flush_video_queue(source->video_q);
 
@@ -480,7 +497,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
                                                }
                                                
                                                if (source->total) {
-                                                       switch_buffer_write(audio_buffer, abuf, olen * 2 * source->channels);
+                                                       switch_buffer_write(audio_buffer, source->abuf, olen * 2 * source->channels);
                                                } else {
                                                        switch_buffer_zero(audio_buffer);
                                                }
@@ -502,6 +519,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
                                        //if (!is_open || used >= source->prebuf || (source->total && used > source->samples * 2 * source->channels)) {
                                        void *pop;
                                        uint32_t bused;
+                                       local_stream_context_t *cp = NULL;
                                        
                                        used = switch_buffer_read(audio_buffer, dist_buf, source->samples * 2 * source->channels);
 
@@ -509,7 +527,6 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
 
                                        switch_mutex_lock(source->mutex);
                                        for (cp = source->context_list; cp && RUNNING; cp = cp->next) {
-                                                       
                                                if (source->has_video) {
                                                        switch_set_flag(cp->handle, SWITCH_FILE_FLAG_VIDEO);
                                                } else {
@@ -621,6 +638,7 @@ static void *SWITCH_THREAD_FUNC read_stream_thread(switch_thread_t *thread, void
                                }
                        } else {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "local_stream://%s fully reloaded.\n",source->name);
+                               switch_thread_rwlock_unlock(source->rwlock);
                                launch_streams(source->name);
                                goto done;
                        }
@@ -683,22 +701,18 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
                return SWITCH_STATUS_FALSE;
        }
 
-       switch_mutex_lock(globals.mutex);
-
   top:
 
        alt_path = switch_mprintf("%s/%d", path, handle->samplerate);
 
-       if ((source = switch_core_hash_find(globals.source_hash, alt_path))) {
+       if ((source = get_source(alt_path))) {
                path = alt_path;
        } else {
-               source = switch_core_hash_find(globals.source_hash, path);
+               source = get_source(path);
        }
-       if (source) {
-               if (switch_thread_rwlock_tryrdlock(source->rwlock) != SWITCH_STATUS_SUCCESS) {
-                       source = NULL;
-               }
-       } else {
+
+
+       if (!source) {
                if (!switch_stristr("default", alt_path) && !switch_stristr("default", path)) {
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unknown source %s, trying 'default'\n", path);
                        free(alt_path);
@@ -706,7 +720,6 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
                        goto top;
                }
        }
-       switch_mutex_unlock(globals.mutex);
 
        if (!source) {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Unknown source %s\n", path);
@@ -715,8 +728,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
        }
 
        if ((context = switch_core_alloc(handle->memory_pool, sizeof(*context))) == 0) {
-               status = SWITCH_STATUS_MEMERR;
-               goto end;
+               abort();
        }
 
        switch_queue_create(&context->video_q, 500, handle->memory_pool);
@@ -760,6 +772,7 @@ static switch_status_t local_stream_file_open(switch_file_handle_t *handle, cons
        switch_mutex_unlock(source->mutex);
 
   end:
+
        switch_safe_free(alt_path);
        return status;
 }
@@ -783,7 +796,7 @@ static switch_status_t local_stream_file_close(switch_file_handle_t *handle)
                last = cp;
        }
        
-       if (context->video_q) {
+       if (context->source->has_video) {
                flush_video_queue(context->video_q);
                switch_queue_trypush(context->video_q, NULL);
                switch_queue_interrupt_all(context->video_q);
@@ -1046,6 +1059,8 @@ static void launch_thread(const char *name, const char *path, switch_xml_t direc
        }
 
        source->samples = switch_samples_per_packet(source->rate, source->interval);
+       source->abuflen = (source->samples * 2 * source->channels) + 1024;
+       source->abuf = switch_core_alloc(source->pool, source->abuflen);
        switch_mutex_init(&source->mutex, SWITCH_MUTEX_NESTED, source->pool);
        switch_threadattr_create(&thd_attr, source->pool);
        switch_threadattr_detach_set(thd_attr, 1);
@@ -1108,55 +1123,37 @@ SWITCH_STANDARD_API(local_stream_function)
 
        local_stream_name = argv[1];
 
+
        if (!strcasecmp(argv[0], "hup") && local_stream_name) {
-               switch_mutex_lock(globals.mutex);
-               source = switch_core_hash_find(globals.source_hash, local_stream_name);
-               switch_mutex_unlock(globals.mutex);
-               
-               if (source) {
+               if ((source = get_source(local_stream_name))) {
                        source->hup = 1;
                        stream->write_function(stream, "+OK hup stream: %s", source->name);
-                       goto done;
+                       switch_thread_rwlock_unlock(source->rwlock);
                }
        } else if (!strcasecmp(argv[0], "stop") && local_stream_name) {
-               switch_mutex_lock(globals.mutex); 
-               source = switch_core_hash_find(globals.source_hash, local_stream_name);
-               switch_mutex_unlock(globals.mutex); 
-
-               if (!source) {
+               if ((source = get_source(local_stream_name))) {
+                       source->stopped = 1;
+                       stream->write_function(stream, "+OK");
+                       switch_thread_rwlock_unlock(source->rwlock);
+               } else {
                        stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name);
-                       goto done;
                }
-               
-               source->stopped = 1;
-               stream->write_function(stream, "+OK");
        } else if (!strcasecmp(argv[0], "reload") && local_stream_name) {
-               switch_mutex_lock(globals.mutex);
-               source = switch_core_hash_find(globals.source_hash, local_stream_name);
-               switch_mutex_unlock(globals.mutex);
-               
-               if (!source) {
+               if ((source = get_source(local_stream_name))) {
+                       source->full_reload = 1;
+                       source->part_reload = 1;
+                       stream->write_function(stream, "+OK");
+               } else {
                        stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name);
-                       goto done;
                }
-
-               source->full_reload = 1;
-               source->part_reload = 1;
-               stream->write_function(stream, "+OK");
        } else if (!strcasecmp(argv[0], "start") && local_stream_name) {
-               switch_mutex_lock(globals.mutex);
-               source = switch_core_hash_find(globals.source_hash, local_stream_name);
-               switch_mutex_unlock(globals.mutex);
-
-               if (source) {
+               if ((source = get_source(local_stream_name))) {
                        source->stopped = 0;
                        stream->write_function(stream, "+OK stream: %s", source->name);
-                       goto done;
-               }
-               
-               if ((ok = launch_streams(local_stream_name))) {
-                       stream->write_function(stream, "+OK stream: %s", local_stream_name);
-                       goto done;
+               } else {
+                       if ((ok = launch_streams(local_stream_name))) {
+                               stream->write_function(stream, "+OK stream: %s", local_stream_name);
+                       }
                }
                
        } else if (!strcasecmp(argv[0], "show")) {
@@ -1165,22 +1162,21 @@ SWITCH_STANDARD_API(local_stream_function)
                void *val;
                switch_bool_t xml = SWITCH_FALSE;
 
-               switch_mutex_lock(globals.mutex);
                if (argc == 1) {
+                       switch_mutex_lock(globals.mutex);
                        for (hi = switch_core_hash_first(globals.source_hash); hi; hi = switch_core_hash_next(&hi)) {
                                switch_core_hash_this(hi, &var, NULL, &val);
                                if ((source = (local_stream_source_t *) val)) {
                                        stream->write_function(stream, "%s,%s\n", source->name, source->location);
                                }
                        }
+                       switch_mutex_unlock(globals.mutex);
                } else {
                        if (argc == 4 && !strcasecmp("xml", argv[3])) {
                                xml = SWITCH_TRUE;
                        }
 
-                       source = switch_core_hash_find(globals.source_hash, local_stream_name);
-
-                       if (source) {
+                       if ((source = get_source(local_stream_name))) {
                                if (xml) {
                                        stream->write_function(stream, "<?xml version=\"1.0\"?>\n<local_stream name=\"%s\">\n", source->name);
                                        stream->write_function(stream, "  <location>%s</location>\n", source->location);
@@ -1210,13 +1206,11 @@ SWITCH_STANDARD_API(local_stream_function)
                                        stream->write_function(stream, "  stopped:  %s\n", (source->stopped) ? "true" : "false");
                                        stream->write_function(stream, "  reloading: %s\n", (source->full_reload) ? "true" : "false");
                                }
+                               switch_thread_rwlock_unlock(source->rwlock);
                        } else {
                                stream->write_function(stream, "-ERR Cannot locate local_stream %s!\n", local_stream_name);
                        }
                }
-               switch_mutex_unlock(globals.mutex);
-
-               goto done;
        }
        
        goto done;
index c6d92ded06c93b0c250334152e2c78b036aee7f5..b241fce0cb07f6e57b4ca2ab35c6e6822593e721 100644 (file)
@@ -292,7 +292,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_perform_file_open(const char *file,
                        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Spool dir is set.  Make sure [%s] is also a valid path\n", fh->spool_path);
                }
                UNPROTECT_INTERFACE(fh->file_interface);
-               switch_goto_status(status, fail);
+               goto fail;
        }
 
        fh->real_channels = fh->channels;
@@ -305,7 +305,7 @@ SWITCH_DECLARE(switch_status_t) switch_core_perform_file_open(const char *file,
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "File [%s] not created!\n", file_path);
                fh->file_interface->file_close(fh);
                UNPROTECT_INTERFACE(fh->file_interface);
-               switch_goto_status(status, fail);
+               goto fail;
        }
 
        if (to) {
index cdebd44558cbc2b61e16a39341da7a53920409cc..8ad48130663c2a975780ab4743f0f8cb15a99fc5 100644 (file)
@@ -333,6 +333,7 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
        time_t answer_limit = 0;
        const char *exec_app = NULL;
        const char *exec_data = NULL;
+       switch_codec_implementation_t read_impl = { 0 };
 
 #ifdef SWITCH_VIDEO_IN_THREADS
        struct vid_helper vh = { 0 };
@@ -345,6 +346,9 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
                return NULL;
        }
 
+       switch_core_session_get_read_impl(session_a, &read_impl);
+
+
        input_callback = data->input_callback;
        user_data = data->session_data;
        stream_id = data->stream_id;
@@ -405,8 +409,6 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
        }
 
        if ((silence_var = switch_channel_get_variable(chan_a, "bridge_generate_comfort_noise"))) {
-               switch_codec_implementation_t read_impl = { 0 };
-               switch_core_session_get_read_impl(session_a, &read_impl);
 
                if (!switch_channel_media_up(chan_a)) {
                        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session_a), SWITCH_LOG_ERROR, "Channel has no media!\n");
@@ -683,7 +685,7 @@ static void *audio_bridge_thread(switch_thread_t *thread, void *obj)
                        if (switch_test_flag(read_frame, SFF_CNG)) {
                                if (silence_val) {
                                        switch_generate_sln_silence((int16_t *) silence_frame.data, silence_frame.samples, 
-                                                                                               read_frame->codec->implementation->number_of_channels, silence_val);
+                                                                                               read_impl.number_of_channels, silence_val);
                                        read_frame = &silence_frame;
                                } else if (!switch_channel_test_flag(chan_b, CF_ACCEPT_CNG)) {
                                        continue;