int dlen;
FILE *fp;
size_t samplerate;
- uint8_t thread_running;
uint8_t shout_init;
uint32_t prebuf;
int lame_ready;
switch_size_t mp3buflen;
switch_thread_rwlock_t *rwlock;
int buffer_seconds;
+ switch_thread_t *read_stream_thread;
+ switch_thread_t *write_stream_thread;
+ curl_socket_t curlfd;
};
typedef struct shout_context shout_context_t;
static inline void free_context(shout_context_t *context)
{
size_t ret;
+ switch_status_t st;
if (context) {
switch_mutex_lock(context->audio_mutex);
switch_mutex_unlock(context->audio_mutex);
if (context->stream_url) {
- int sanity = 0;
-
- while (context->thread_running) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url);
- switch_yield(500000);
- if (++sanity > 10) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "Giving up waiting for stream to terminate: %s\n", context->stream_url);
- break;
- }
+ if (context->curlfd > -1) {
+ shutdown(context->curlfd, 2);
+ }
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Waiting for stream to terminate: %s\n", context->stream_url);
+ if (context->read_stream_thread) {
+ switch_thread_join(&st, context->read_stream_thread);
}
}
+ if (context->write_stream_thread) {
+ switch_thread_join(&st, context->write_stream_thread);
+ }
+
switch_thread_rwlock_wrlock(context->rwlock);
if (context->mh) {
uint32_t buf_size = 1024 * 128; /* do not make this 64 or less, stutter will ensue after first 64k buffer is dry */
switch_size_t used;
+ if (context->err) {
+ goto error;
+ }
+
if (!context->stream_channels) {
long rate = 0;
int channels = 0;
switch_yield(500000);
}
+ if (context->err) {
+ goto error;
+ }
+
if (mpg123_feed(context->mh, ptr, realsize) != MPG123_OK) {
goto error;
}
return 0;
}
+static int progress_callback(void *clientp, double dltotal, double dlnow, double ultotal, double ulnow)
+{
+ shout_context_t *context = (shout_context_t *) clientp;
+ return context->err;
+}
+
+
+static int sockopt_callback(void *clientp, curl_socket_t curlfd,
+ curlsocktype purpose)
+{
+ shout_context_t *context = (shout_context_t *) clientp;
+
+ context->curlfd = curlfd;
+
+ return CURL_SOCKOPT_OK;
+}
#define MY_BUF_LEN 1024 * 32
#define MY_BLOCK_SIZE MY_BUF_LEN
shout_context_t *context = (shout_context_t *) obj;
switch_thread_rwlock_rdlock(context->rwlock);
-
+ context->curlfd = -1;
curl_handle = switch_curl_easy_init();
switch_curl_easy_setopt(curl_handle, CURLOPT_URL, context->stream_url);
+ curl_easy_setopt(curl_handle, CURLOPT_PROGRESSFUNCTION, progress_callback);
+ curl_easy_setopt(curl_handle, CURLOPT_PROGRESSDATA, (void *)context);
switch_curl_easy_setopt(curl_handle, CURLOPT_FOLLOWLOCATION, 1);
switch_curl_easy_setopt(curl_handle, CURLOPT_MAXREDIRS, 10);
switch_curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, stream_callback);
switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_LIMIT, 100); /* handle trickle connections */
switch_curl_easy_setopt(curl_handle, CURLOPT_LOW_SPEED_TIME, 30);
switch_curl_easy_setopt(curl_handle, CURLOPT_ERRORBUFFER, context->curl_error_buff);
+ curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
+ curl_easy_setopt(curl_handle, CURLOPT_SOCKOPTDATA, (void *)context);
+
cc = switch_curl_easy_perform(curl_handle);
+
if (cc && cc != CURLE_WRITE_ERROR) { /* write error is ok, we just exited from callback early */
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "CURL returned error:[%d] %s : %s [%s]\n", cc, switch_curl_easy_strerror(cc),
context->curl_error_buff, context->stream_url);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Read Thread Done\n");
context->eof++;
- context->thread_running = 0;
switch_thread_rwlock_unlock(context->rwlock);
return NULL;
}
static void launch_read_stream_thread(shout_context_t *context)
{
- switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
- context->thread_running = 1;
switch_threadattr_create(&thd_attr, context->memory_pool);
- switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- switch_thread_create(&thread, thd_attr, read_stream_thread, context, context->memory_pool);
+ switch_thread_create(&context->read_stream_thread, thd_attr, read_stream_thread, context, context->memory_pool);
}
#define error_check() if (context->err) goto error;
switch_thread_rwlock_rdlock(context->rwlock);
- if (context->thread_running) {
- context->thread_running++;
- } else {
- switch_thread_rwlock_unlock(context->rwlock);
- return NULL;
- }
-
if (!context->lame_ready) {
lame_init_params(context->gfp);
lame_print_config(context->gfp);
context->lame_ready = 1;
}
- while (!context->err && context->thread_running) {
+ while (!context->err) {
unsigned char mp3buf[20480] = "";
int16_t audio[9600] = { 0 };
switch_size_t audio_read = 0;
error:
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Write Thread Done\n");
switch_thread_rwlock_unlock(context->rwlock);
- context->thread_running = 0;
+
return NULL;
}
static void launch_write_stream_thread(shout_context_t *context)
{
- switch_thread_t *thread;
switch_threadattr_t *thd_attr = NULL;
- int sanity = 10;
if (context->err) {
return;
}
- context->thread_running = 1;
switch_threadattr_create(&thd_attr, context->memory_pool);
- switch_threadattr_detach_set(thd_attr, 1);
switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
- switch_thread_create(&thread, thd_attr, write_stream_thread, context, context->memory_pool);
-
- while (context->thread_running && context->thread_running != 2) {
- switch_yield(100000);
- if (!--sanity)
- break;
- }
+ switch_thread_create(&context->write_stream_thread, thd_attr, write_stream_thread, context, context->memory_pool);
}
#define TC_BUFFER_SIZE 1024 * 32