SWITCH_STANDARD_API(http_cache_put);
SWITCH_STANDARD_API(http_cache_tryget);
SWITCH_STANDARD_API(http_cache_clear);
+SWITCH_STANDARD_API(http_cache_prefetch);
#define DOWNLOAD_NEEDED "download"
int misses;
/** Number of cache errors */
int errors;
+ /** The prefetch queue */
+ switch_queue_t *prefetch_queue;
+ /** Max size of prefetch queue */
+ int prefetch_queue_size;
+ /** Size of prefetch thread pool */
+ int prefetch_thread_count;
+ /** Shutdown flag */
+ int shutdown;
+ /** Synchronizes shutdown of cache */
+ switch_thread_rwlock_t *shutdown_lock;
};
static url_cache_t gcache;
}
}
+static int isUrl(const char *filename)
+{
+ return !zstr(filename) && !strncmp("http://", filename, strlen("http://"));
+}
+
+#define HTTP_PREFETCH_SYNTAX "<url>"
+SWITCH_STANDARD_API(http_cache_prefetch)
+{
+ switch_status_t status = SWITCH_STATUS_SUCCESS;
+ switch_memory_pool_t *lpool = NULL;
+ switch_memory_pool_t *pool = NULL;
+ char *url;
+
+ if (!isUrl(cmd)) {
+ stream->write_function(stream, "USAGE: %s\n", HTTP_PREFETCH_SYNTAX);
+ return SWITCH_STATUS_SUCCESS;
+ }
+
+ if (session) {
+ pool = switch_core_session_get_pool(session);
+ } else {
+ switch_core_new_memory_pool(&lpool);
+ pool = lpool;
+ }
+
+ /* send to thread pool */
+ url = strdup(cmd);
+ if (switch_queue_trypush(gcache.prefetch_queue, url) != SWITCH_STATUS_SUCCESS) {
+ switch_safe_free(url);
+ switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue prefetch request\n");
+ stream->write_function(stream, "-ERR\n");
+ } else {
+ stream->write_function(stream, "+OK\n");
+ }
+
+ if (lpool) {
+ switch_core_destroy_memory_pool(&lpool);
+ }
+
+ return status;
+}
+
#define HTTP_GET_SYNTAX "<url>"
/**
* Get a file from the cache, download if it isn't cached
switch_memory_pool_t *pool = NULL;
char *filename;
- if (zstr(cmd) || strncmp("http://", cmd, strlen("http://"))) {
+ if (!isUrl(cmd)) {
stream->write_function(stream, "USAGE: %s\n", HTTP_GET_SYNTAX);
return SWITCH_STATUS_SUCCESS;
}
return SWITCH_STATUS_SUCCESS;
}
+
+/**
+ * Thread to prefetch URLs
+ * @param thread the thread
+ * @param obj started flag
+ * @return NULL
+ */
+static void *SWITCH_THREAD_FUNC prefetch_thread(switch_thread_t *thread, void *obj)
+{
+ int *started = obj;
+ void *url = NULL;
+
+ switch_thread_rwlock_rdlock(gcache.shutdown_lock);
+ *started = 1;
+
+ // process prefetch requests
+ while (!gcache.shutdown) {
+ if (switch_queue_pop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) {
+ switch_stream_handle_t stream = { 0 };
+ SWITCH_STANDARD_STREAM(stream);
+ switch_api_execute("http_get", url, NULL, &stream);
+ switch_safe_free(stream.data);
+ switch_safe_free(url);
+ }
+ url = NULL;
+ }
+
+ // shutting down- clear the queue
+ while (switch_queue_trypop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) {
+ switch_safe_free(url);
+ url = NULL;
+ }
+
+ switch_thread_rwlock_unlock(gcache.shutdown_lock);
+
+ return NULL;
+}
+
/**
* Configure the module
* @param cache to configure
max_urls = 4000;
default_max_age_sec = 86400;
cache->location = SWITCH_PREFIX_DIR "/http_cache";
+ cache->prefetch_queue_size = 100;
+ cache->prefetch_thread_count = 8;
/* get params */
settings = switch_xml_child(cfg, "settings");
} else if (!strcasecmp(var, "default-max-age")) {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting default-max-age to %s\n", val);
default_max_age_sec = atoi(val);
+ } else if (!strcasecmp(var, "prefetch-queue-size")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-queue-size to %s\n", val);
+ cache->prefetch_queue_size = atoi(val);
+ } else if (!strcasecmp(var, "prefetch-thread-count")) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-thread-count to %s\n", val);
+ cache->prefetch_thread_count = atoi(val);
} else {
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var);
}
status = SWITCH_STATUS_TERM;
goto done;
}
+ if (cache->prefetch_queue_size <= 0) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-queue-size must be > 0\n");
+ status = SWITCH_STATUS_TERM;
+ goto done;
+ }
+ if (cache->prefetch_thread_count <= 0) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-thread-count must be > 0\n");
+ status = SWITCH_STATUS_TERM;
+ goto done;
+ }
cache->max_url = max_urls;
cache->default_max_age = (default_max_age_sec * 1000 * 1000); /* convert from seconds to nanoseconds */
SWITCH_ADD_API(api, "http_tryget", "HTTP GET from cache only", http_cache_tryget, HTTP_GET_SYNTAX);
SWITCH_ADD_API(api, "http_put", "HTTP PUT", http_cache_put, HTTP_PUT_SYNTAX);
SWITCH_ADD_API(api, "http_clear_cache", "Clear the cache", http_cache_clear, HTTP_CACHE_CLEAR_SYNTAX);
+ SWITCH_ADD_API(api, "http_prefetch", "Prefetch document in a background thread. Use http_get to get the prefetched document", http_cache_prefetch, HTTP_PREFETCH_SYNTAX);
memset(&gcache, 0, sizeof(url_cache_t));
gcache.pool = pool;
switch_core_hash_init(&gcache.map, gcache.pool);
switch_mutex_init(&gcache.mutex, SWITCH_MUTEX_UNNESTED, gcache.pool);
+ switch_thread_rwlock_create(&gcache.shutdown_lock, gcache.pool);
/* create the queue */
gcache.queue.max_size = gcache.max_url;
setup_dir(&gcache);
+ /* Start the prefetch threads */
+ switch_queue_create(&gcache.prefetch_queue, gcache.prefetch_queue_size, gcache.pool);
+ for (int i = 0; i < gcache.prefetch_thread_count; i++) {
+ int started = 0;
+ switch_thread_t *thread;
+ switch_threadattr_t *thd_attr = NULL;
+ switch_threadattr_create(&thd_attr, gcache.pool);
+ switch_threadattr_detach_set(thd_attr, 1);
+ switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+ switch_thread_create(&thread, thd_attr, prefetch_thread, &started, gcache.pool);
+ while (!started) {
+ switch_sleep(1000);
+ }
+ }
+
/* indicate that the module should continue to be loaded */
return SWITCH_STATUS_SUCCESS;
}
*/
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_http_cache_shutdown)
{
+ gcache.shutdown = 1;
+ switch_queue_interrupt_all(gcache.prefetch_queue);
+ switch_thread_rwlock_wrlock(gcache.shutdown_lock);
+
url_cache_clear(&gcache, NULL);
switch_core_hash_destroy(&gcache.map);
switch_mutex_destroy(gcache.mutex);