]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
add prefetch API to mod_http_cache
authorChristopher Rienzo <chris@rienzo.net>
Wed, 25 Jan 2012 15:07:59 +0000 (15:07 +0000)
committerChristopher Rienzo <chris@rienzo.net>
Wed, 25 Jan 2012 15:07:59 +0000 (15:07 +0000)
src/mod/applications/mod_http_cache/mod_http_cache.c

index b2004b54568a90b65375a7dc56775408ad11ad9b..29cce6a4a2e4e7c428adee531747b9c79a696700 100644 (file)
@@ -40,6 +40,7 @@ SWITCH_STANDARD_API(http_cache_get);
 SWITCH_STANDARD_API(http_cache_put);
 SWITCH_STANDARD_API(http_cache_tryget);
 SWITCH_STANDARD_API(http_cache_clear);
+SWITCH_STANDARD_API(http_cache_prefetch);
 
 #define DOWNLOAD_NEEDED "download"
 
@@ -148,6 +149,16 @@ struct url_cache {
        int misses;
        /** Number of cache errors */
        int errors;
+       /** The prefetch queue */
+       switch_queue_t *prefetch_queue;
+       /** Max size of prefetch queue */
+       int prefetch_queue_size;
+       /** Size of prefetch thread pool */
+       int prefetch_thread_count;
+       /** Shutdown flag */
+       int shutdown;
+       /** Synchronizes shutdown of cache */
+       switch_thread_rwlock_t *shutdown_lock;
 };
 static url_cache_t gcache;
 
@@ -778,6 +789,48 @@ static void setup_dir(url_cache_t *cache)
        }
 }
 
+static int isUrl(const char *filename)
+{
+       return !zstr(filename) && !strncmp("http://", filename, strlen("http://"));
+}
+
+#define HTTP_PREFETCH_SYNTAX "<url>"
+SWITCH_STANDARD_API(http_cache_prefetch)
+{
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+       switch_memory_pool_t *lpool = NULL;
+       switch_memory_pool_t *pool = NULL;
+       char *url;
+
+       if (!isUrl(cmd)) {
+               stream->write_function(stream, "USAGE: %s\n", HTTP_PREFETCH_SYNTAX);
+               return SWITCH_STATUS_SUCCESS;
+       }
+
+       if (session) {
+               pool = switch_core_session_get_pool(session);
+       } else {
+               switch_core_new_memory_pool(&lpool);
+               pool = lpool;
+       }
+       
+       /* send to thread pool */
+       url = strdup(cmd);
+       if (switch_queue_trypush(gcache.prefetch_queue, url) != SWITCH_STATUS_SUCCESS) {
+               switch_safe_free(url);
+               switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "Failed to queue prefetch request\n");
+               stream->write_function(stream, "-ERR\n");
+       } else {
+               stream->write_function(stream, "+OK\n");
+       }
+
+       if (lpool) {
+               switch_core_destroy_memory_pool(&lpool);
+       }
+
+       return status;
+}
+
 #define HTTP_GET_SYNTAX "<url>"
 /**
  * Get a file from the cache, download if it isn't cached
@@ -789,7 +842,7 @@ SWITCH_STANDARD_API(http_cache_get)
        switch_memory_pool_t *pool = NULL;
        char *filename;
 
-       if (zstr(cmd) || strncmp("http://", cmd, strlen("http://"))) {
+       if (!isUrl(cmd)) {
                stream->write_function(stream, "USAGE: %s\n", HTTP_GET_SYNTAX);
                return SWITCH_STATUS_SUCCESS;
        }
@@ -911,6 +964,44 @@ SWITCH_STANDARD_API(http_cache_clear)
        return SWITCH_STATUS_SUCCESS;
 }
 
+
+/**
+ * Thread to prefetch URLs
+ * @param thread the thread
+ * @param obj started flag
+ * @return NULL
+ */
+static void *SWITCH_THREAD_FUNC prefetch_thread(switch_thread_t *thread, void *obj)
+{
+       int *started = obj;
+       void *url = NULL;
+
+       switch_thread_rwlock_rdlock(gcache.shutdown_lock);
+       *started = 1;
+
+       // process prefetch requests
+       while (!gcache.shutdown) {
+               if (switch_queue_pop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) {
+                       switch_stream_handle_t stream = { 0 };
+                       SWITCH_STANDARD_STREAM(stream);
+                       switch_api_execute("http_get", url, NULL, &stream);
+                       switch_safe_free(stream.data);
+                       switch_safe_free(url);
+               }
+               url = NULL;
+       }
+
+       // shutting down- clear the queue
+       while (switch_queue_trypop(gcache.prefetch_queue, &url) == SWITCH_STATUS_SUCCESS) {
+               switch_safe_free(url);
+               url = NULL;
+       }
+
+       switch_thread_rwlock_unlock(gcache.shutdown_lock);
+
+       return NULL;
+}
+
 /**
  * Configure the module
  * @param cache to configure
@@ -933,6 +1024,8 @@ static switch_status_t do_config(url_cache_t *cache)
        max_urls = 4000;
        default_max_age_sec = 86400;
        cache->location = SWITCH_PREFIX_DIR "/http_cache";
+       cache->prefetch_queue_size = 100;
+       cache->prefetch_thread_count = 8;
 
        /* get params */
        settings = switch_xml_child(cfg, "settings");
@@ -949,6 +1042,12 @@ static switch_status_t do_config(url_cache_t *cache)
                        } else if (!strcasecmp(var, "default-max-age")) {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting default-max-age to %s\n", val);
                                default_max_age_sec = atoi(val);
+                       } else if (!strcasecmp(var, "prefetch-queue-size")) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-queue-size to %s\n", val);
+                               cache->prefetch_queue_size = atoi(val);
+                       } else if (!strcasecmp(var, "prefetch-thread-count")) {
+                               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "Setting prefetch-thread-count to %s\n", val);
+                               cache->prefetch_thread_count = atoi(val);
                        } else {
                                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Unsupported param: %s\n", var);
                        }
@@ -971,6 +1070,16 @@ static switch_status_t do_config(url_cache_t *cache)
                status = SWITCH_STATUS_TERM;
                goto done;
        }
+       if (cache->prefetch_queue_size <= 0) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-queue-size must be > 0\n");
+               status = SWITCH_STATUS_TERM;
+               goto done;
+       }
+       if (cache->prefetch_thread_count <= 0) {
+               switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "prefetch-thread-count must be > 0\n");
+               status = SWITCH_STATUS_TERM;
+               goto done;
+       }
 
        cache->max_url = max_urls;
        cache->default_max_age = (default_max_age_sec * 1000 * 1000); /* convert from seconds to nanoseconds */
@@ -992,6 +1101,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
        SWITCH_ADD_API(api, "http_tryget", "HTTP GET from cache only", http_cache_tryget, HTTP_GET_SYNTAX);
        SWITCH_ADD_API(api, "http_put", "HTTP PUT", http_cache_put, HTTP_PUT_SYNTAX);
        SWITCH_ADD_API(api, "http_clear_cache", "Clear the cache", http_cache_clear, HTTP_CACHE_CLEAR_SYNTAX);
+       SWITCH_ADD_API(api, "http_prefetch", "Prefetch document in a background thread.  Use http_get to get the prefetched document", http_cache_prefetch, HTTP_PREFETCH_SYNTAX);
        
        memset(&gcache, 0, sizeof(url_cache_t));
        gcache.pool = pool;
@@ -1002,6 +1112,7 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
 
        switch_core_hash_init(&gcache.map, gcache.pool);
        switch_mutex_init(&gcache.mutex, SWITCH_MUTEX_UNNESTED, gcache.pool);
+       switch_thread_rwlock_create(&gcache.shutdown_lock, gcache.pool);
 
        /* create the queue */
        gcache.queue.max_size = gcache.max_url;
@@ -1011,6 +1122,21 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
 
        setup_dir(&gcache);
 
+       /* Start the prefetch threads */
+       switch_queue_create(&gcache.prefetch_queue, gcache.prefetch_queue_size, gcache.pool);
+       for (int i = 0; i < gcache.prefetch_thread_count; i++) {
+               int started = 0;
+               switch_thread_t *thread;
+               switch_threadattr_t *thd_attr = NULL;
+               switch_threadattr_create(&thd_attr, gcache.pool);
+               switch_threadattr_detach_set(thd_attr, 1);
+               switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE);
+               switch_thread_create(&thread, thd_attr, prefetch_thread, &started, gcache.pool);
+               while (!started) {
+                       switch_sleep(1000);
+               }
+       }
+
        /* indicate that the module should continue to be loaded */
        return SWITCH_STATUS_SUCCESS;
 }
@@ -1020,6 +1146,10 @@ SWITCH_MODULE_LOAD_FUNCTION(mod_http_cache_load)
  */
 SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_http_cache_shutdown)
 {
+       gcache.shutdown = 1;
+       switch_queue_interrupt_all(gcache.prefetch_queue);
+       switch_thread_rwlock_wrlock(gcache.shutdown_lock);
+
        url_cache_clear(&gcache, NULL);
        switch_core_hash_destroy(&gcache.map);
        switch_mutex_destroy(gcache.mutex);