]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9952: Initial rpc application implementation
authorcolm <colm@freeswitch1>
Wed, 8 Feb 2017 01:18:45 +0000 (20:18 -0500)
committerMike Jerris <mike@jerris.com>
Wed, 22 Mar 2017 21:42:49 +0000 (17:42 -0400)
libs/libblade/Makefile.am
libs/libblade/src/blade_rpcproto.c [new file with mode: 0644]
libs/libblade/src/include/blade_rpcproto.h [new file with mode: 0644]

index ee5e5677d01cdd941d529159f19da196b3951be1..12ef174b65b2f57e02191cf12cccf5e074f7e595 100644 (file)
@@ -12,13 +12,13 @@ libunqlite_la_LIBADD    = -lpthread
 
 lib_LTLIBRARIES                = libblade.la
 libblade_la_SOURCES     = src/blade.c src/blade_stack.c src/blade_peer.c src/blade_service.c src/bpcp.c src/blade_datastore.c
-libblade_la_SOURCES    += src/blade_message.c
+libblade_la_SOURCES    += src/blade_message.c src/blade_rpcproto.c
 libblade_la_CFLAGS     = $(AM_CFLAGS) $(AM_CPPFLAGS)
 libblade_la_LDFLAGS     = -version-info 0:1:0 -lncurses -lpthread -lm -lconfig $(AM_LDFLAGS)
 libblade_la_LIBADD      = libunqlite.la
 library_includedir     = $(prefix)/include
 library_include_HEADERS = src/include/blade.h src/include/blade_types.h src/include/blade_stack.h src/include/blade_peer.h src/include/blade_service.h
-library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_message.h
+library_include_HEADERS += src/include/bpcp.h src/include/blade_datastore.h src/include/blade_message.h src/include/blade_rpcproto.h
 library_include_HEADERS += src/include/unqlite.h test/tap.h
 
 tests: libblade.la
diff --git a/libs/libblade/src/blade_rpcproto.c b/libs/libblade/src/blade_rpcproto.c
new file mode 100644 (file)
index 0000000..823cf1f
--- /dev/null
@@ -0,0 +1,897 @@
+/*
+ * Copyright (c) 2017 FreeSWITCH Solutions LLC
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#pragma GCC optimize ("O0")
+
+#include <blade_rpcproto.h>
+#include <blade_message.h>
+
+/* 
+ * internal shared structure grounded in global 
+ */
+typedef struct blade_rpc_handle_ex {
+
+    ks_hash_t  *namespace_hash;      /* hash to namespace methods */
+       ks_hash_t  *template_hash;               /* hash to template methods */
+
+    ks_hash_t  *peer_hash;           /* hash to peer structure */
+       
+       ks_q_t     *event_queue;
+       ks_bool_t   isactive;
+       ks_pool_t  *pool; 
+
+} blade_rpc_handle_ex_t;
+
+
+typedef struct blade_rpc_namespace_s {
+
+       char       name[KS_RPCMESSAGE_NAMESPACE_LENGTH+1];
+       char       version[KS_RPCMESSAGE_VERSION_LENGTH+1];   /* nnn.nn.nn */
+    ks_hash_t *method_hash;      /* hash to namespace methods */
+
+} blade_rpc_namespace_t;
+
+
+
+
+blade_rpc_handle_ex_t *g_handle = NULL;
+
+
+/*
+ * callbacks - from blade_rpc_handle_ex->method_hash
+ */
+
+typedef struct blade_rpc_custom_callbackpair_s
+{
+    jrpc_prefix_func_t       prefix_request_func;
+    jrpc_postfix_func_t      postfix_request_func;
+
+    jrpc_prefix_resp_func_t  prefix_response_func;
+    jrpc_postfix_resp_func_t postfix_response_func;
+
+} blade_rpc_custom_callbackpair_t;
+
+
+
+typedef struct blade_rpc_callbackpair_s
+{
+    jrpc_func_t              request_func;
+
+    jrpc_resp_func_t         response_func;
+
+       blade_rpc_custom_callbackpair_t* custom;
+
+       ks_mutex_t               *lock; 
+
+    uint16_t         command_length;
+    char             command[1];
+
+} blade_rpc_callbackpair_t;
+
+
+
+
+
+static void blade_rpc_make_fqcommand(const char* namespace, const char *command, char *fscommand)
+{
+    memset(fscommand, 0, KS_RPCMESSAGE_FQCOMMAND_LENGTH);
+       sprintf(fscommand, "%s.%s", namespace, command);
+
+       return;
+}
+
+static void blade_rpc_parse_fqcommand(const char* fscommand, char *namespace, char *command)
+{
+    memset(command, 0, KS_RPCMESSAGE_COMMAND_LENGTH);
+    memset(namespace, 0, KS_RPCMESSAGE_NAMESPACE_LENGTH);
+
+       uint32_t len = strlen(fscommand);
+
+       assert(len <=  KS_RPCMESSAGE_FQCOMMAND_LENGTH); 
+       ks_bool_t dotfound = KS_FALSE;
+
+       for(int i=0, x=0; i<len; ++i, ++x)  {
+
+               if (fscommand[i] == '.' && dotfound == KS_FALSE) {
+                       dotfound = KS_TRUE;
+                       x = 0;
+               }
+               else if (dotfound == KS_FALSE) {
+                       namespace[x] = fscommand[i];    
+               }
+               else {
+                       command[x] = fscommand[i];
+               }
+
+       }
+
+       return;
+}
+
+
+KS_DECLARE(ks_status_t) blade_rpc_init(ks_pool_t *pool)
+{
+
+       if (g_handle == NULL) {
+               g_handle = ks_pool_alloc(pool, sizeof(blade_rpc_handle_ex_t));
+           ks_hash_create(&g_handle->namespace_hash,
+                    KS_HASH_MODE_CASE_SENSITIVE,
+                    KS_HASH_FLAG_RWLOCK + KS_HASH_FLAG_DUP_CHECK,       // + KS_HASH_FLAG_FREE_VALUE,
+                    pool);
+
+        ks_hash_create(&g_handle->template_hash,
+                    KS_HASH_MODE_CASE_SENSITIVE,
+                    KS_HASH_FLAG_RWLOCK + KS_HASH_FLAG_DUP_CHECK,       // + KS_HASH_FLAG_FREE_VALUE,
+                    pool);
+                                       
+               ks_q_create(&g_handle->event_queue, pool, 1024);        
+
+               g_handle->pool = pool;
+
+               /* initialize rpc messaging mechanism */
+               ks_rpcmessage_init(pool);
+
+               g_handle->isactive = KS_TRUE;
+       }       
+       return KS_STATUS_SUCCESS;
+}
+
+
+KS_DECLARE(ks_status_t) blade_rpc_onconnect(ks_pool_t *pool, blade_peer_t* peer)
+{
+
+
+       return KS_STATUS_FAIL;
+}
+
+KS_DECLARE(ks_status_t) blade_rpc_disconnect(blade_peer_t* peer)
+{
+
+       return KS_STATUS_FAIL;
+}
+
+
+
+/*
+ * namespace setup
+ */
+
+/*
+ *  function/callback functions
+ */
+static blade_rpc_callbackpair_t *blade_rpc_find_callbacks_locked(char *namespace, char *command)
+{
+       blade_rpc_callbackpair_t *callbacks = NULL;
+
+    blade_rpc_namespace_t *n =  ks_hash_search(g_handle->namespace_hash, namespace, KS_UNLOCKED);
+       if (n) {
+               char fqcommand[KS_RPCMESSAGE_FQCOMMAND_LENGTH+1];
+               blade_rpc_make_fqcommand(namespace, command, fqcommand);
+
+               ks_hash_read_lock(n->method_hash);
+
+        callbacks = ks_hash_search(n->method_hash, fqcommand, KS_UNLOCKED);
+               ks_mutex_lock(callbacks->lock);
+
+               ks_hash_read_lock(n->method_hash);
+       }
+
+    return callbacks;
+}
+
+static blade_rpc_callbackpair_t *blade_rpc_find_template_locked(char *name, char *command)
+{
+    blade_rpc_callbackpair_t *callbacks = NULL;
+
+    blade_rpc_namespace_t *n =  ks_hash_search(g_handle->template_hash, name, KS_UNLOCKED);
+    if (n) {
+
+        ks_hash_read_lock(n->method_hash);
+        callbacks = ks_hash_search(n->method_hash, command, KS_UNLOCKED);
+        ks_mutex_lock(callbacks->lock);
+
+        ks_hash_read_lock(n->method_hash);
+    }
+
+    return callbacks;
+}
+
+
+
+
+static blade_rpc_callbackpair_t *blade_rpc_find_callbacks_locked_fq(char *fqcommand)
+{
+    blade_rpc_callbackpair_t *callbacks = NULL;
+
+       char command[KS_RPCMESSAGE_COMMAND_LENGTH+1];
+    char namespace[KS_RPCMESSAGE_NAMESPACE_LENGTH+1];
+
+       blade_rpc_parse_fqcommand(fqcommand, namespace, command);
+
+    blade_rpc_namespace_t *n =  ks_hash_search(g_handle->namespace_hash, namespace, KS_UNLOCKED);
+    if (n) {
+        blade_rpc_make_fqcommand(namespace, command, fqcommand);
+               ks_hash_read_lock(n->method_hash);
+
+        callbacks = ks_hash_search(n->method_hash, fqcommand, KS_UNLOCKED);
+               ks_mutex_lock(callbacks->lock);
+
+               ks_hash_read_unlock(n->method_hash);
+    }
+
+    return callbacks;
+}
+
+
+KS_DECLARE(jrpc_func_t) blade_rpc_find_request_function(char *fqcommand)
+{
+
+    blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
+
+    if (!callbacks) {
+        return NULL;
+    }
+
+    jrpc_func_t f = callbacks->request_func;
+
+    ks_mutex_unlock(callbacks->lock);
+
+    return f;
+}
+
+KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_requestprefix_function(char *fqcommand)
+{
+
+    blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
+
+    if (!callbacks || !callbacks->custom) {
+        return NULL;
+    }
+
+    jrpc_resp_func_t f = callbacks->custom->prefix_request_func;
+
+    ks_mutex_unlock(callbacks->lock);
+
+    return f;
+}
+
+KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_response_function(char *fqcommand)
+{
+
+    blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
+
+       if (!callbacks) {
+               return NULL;
+       }
+               
+    jrpc_resp_func_t f = callbacks->response_func;
+
+       ks_mutex_unlock(callbacks->lock);
+
+       return f;
+}
+
+KS_DECLARE(jrpc_resp_func_t) blade_rpc_find_responseprefix_function(char *fqcommand)
+{
+
+    blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked_fq(fqcommand);
+
+    if (!callbacks || !callbacks->custom) {
+        return NULL;
+    }
+
+    jrpc_resp_func_t f = callbacks->custom->prefix_response_func;
+
+    ks_mutex_unlock(callbacks->lock);
+
+    return f;
+}
+
+
+KS_DECLARE(ks_status_t) blade_rpc_declare_namespace(char* namespace, const char* version)
+{
+
+       /* find/insert to namespace hash as needed */
+       ks_hash_write_lock(g_handle->namespace_hash);
+       blade_rpc_namespace_t *n =  ks_hash_search(g_handle->namespace_hash, namespace, KS_UNLOCKED);
+       if (n == NULL) {
+               n = ks_pool_alloc(g_handle->pool, sizeof (blade_rpc_namespace_t) + strlen(namespace) + 1);
+               strncpy(n->name, namespace, KS_RPCMESSAGE_NAMESPACE_LENGTH);
+               strncpy(n->version, version, KS_RPCMESSAGE_VERSION_LENGTH);
+               ks_hash_insert(g_handle->namespace_hash, n->name, n);
+       }
+       ks_hash_write_unlock(g_handle->namespace_hash);
+
+    ks_log(KS_LOG_DEBUG, "Setting message namespace value %s, version %s", namespace, version);
+
+    return KS_STATUS_SUCCESS;
+}
+
+
+KS_DECLARE(ks_status_t)blade_rpc_register_namespace_function(char *namespace, 
+                                                char *command,
+                                                jrpc_func_t func,
+                                                jrpc_resp_func_t respfunc)
+{
+    if (!func && !respfunc) {
+        return KS_STATUS_FAIL;
+    }
+
+       char nskey[KS_RPCMESSAGE_NAMESPACE_LENGTH+1];
+       memset(nskey, 0, sizeof(nskey));
+       strcpy(nskey, namespace);
+
+    char fqcommand[KS_RPCMESSAGE_FQCOMMAND_LENGTH];
+    memset(fqcommand, 0, sizeof(fqcommand));
+
+    strcpy(fqcommand, namespace);
+       strcpy(fqcommand, ".");
+    strcat(fqcommand, command);
+
+    int lkey = strlen(fqcommand)+1;
+
+    if (lkey < 16) {
+        lkey = 16;
+    }
+
+    ks_hash_read_lock(g_handle->namespace_hash);    /* lock namespace hash */
+
+    blade_rpc_namespace_t *n =  ks_hash_search(g_handle->namespace_hash, nskey, KS_UNLOCKED);
+
+    if (n == NULL) {
+        ks_hash_read_unlock(g_handle->namespace_hash);
+        ks_log(KS_LOG_ERROR, "Unable to find %s namespace\n", namespace);
+        return KS_STATUS_FAIL;
+    }
+
+       blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked(nskey, command);
+
+       /* just ignore attempt to re register callbacks */
+       /* @todo :  should this be smarter, allow override ? */
+       if (callbacks != NULL) {
+               ks_mutex_unlock(callbacks->lock);
+        ks_hash_read_unlock(g_handle->namespace_hash);
+        ks_log(KS_LOG_ERROR, "Callbacks already registered for %s namespace\n", namespace);
+        return KS_STATUS_FAIL;
+       }
+
+    callbacks =
+            (blade_rpc_callbackpair_t*)ks_pool_alloc(g_handle->pool, lkey + sizeof(blade_rpc_callbackpair_t));
+
+    strcpy(callbacks->command, command);
+    callbacks->command_length = lkey;
+    callbacks->request_func   = func;
+    callbacks->response_func  = respfunc;
+       ks_mutex_create(&callbacks->lock, KS_MUTEX_FLAG_DEFAULT, g_handle->pool);
+
+    ks_hash_write_lock(n->method_hash);             /* lock method hash */
+
+    ks_hash_insert(n->method_hash, callbacks->command, (void *) callbacks);
+
+    ks_hash_write_unlock(n->method_hash);           /* unlock method hash */
+       ks_hash_read_unlock(g_handle->namespace_hash);  /* unlock namespace hash */
+
+    ks_log(KS_LOG_DEBUG, "Message %s %s registered\n", namespace, command);
+
+    return KS_STATUS_SUCCESS;
+
+}
+
+
+KS_DECLARE(ks_status_t)blade_rpc_register_prefix_request_function(char *namespace, 
+                                                                                                       char *command,
+                                                                                                       jrpc_prefix_func_t prefix_func,
+                                                                                                       jrpc_postfix_func_t postfix_func)
+{
+       ks_status_t s = KS_STATUS_FAIL;
+
+    ks_hash_write_lock(g_handle->namespace_hash);
+       blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked(namespace, command);
+
+    if (callbacks) {
+
+               if (!callbacks->custom) {
+                       callbacks->custom = 
+                               (blade_rpc_custom_callbackpair_t *)ks_pool_alloc(g_handle->pool, sizeof(blade_rpc_custom_callbackpair_t));
+               }
+
+               callbacks->custom->prefix_request_func  = prefix_func;
+               callbacks->custom->postfix_request_func = postfix_func;
+               ks_mutex_unlock(callbacks->lock);
+               s = KS_STATUS_SUCCESS;
+       }
+
+       ks_hash_write_unlock(g_handle->namespace_hash);
+       return  s;
+}
+
+KS_DECLARE(ks_status_t)blade_rpc_register_prefix_response_function(char* namespace, 
+                                                                                                       char *command,
+                                                                                                       jrpc_prefix_resp_func_t prefix_func,
+                                                                                                       jrpc_postfix_resp_func_t postfix_func)
+{
+       ks_status_t s = KS_STATUS_FAIL;
+       
+       ks_hash_write_lock(g_handle->namespace_hash);
+    blade_rpc_callbackpair_t *callbacks = blade_rpc_find_callbacks_locked(namespace, command);
+
+    if (callbacks) {
+
+               if (!callbacks->custom) {
+                       callbacks->custom = 
+                               (blade_rpc_custom_callbackpair_t *)ks_pool_alloc(g_handle->pool, sizeof(blade_rpc_custom_callbackpair_t));
+               }
+
+        callbacks->custom->prefix_response_func  = prefix_func;
+        callbacks->custom->postfix_response_func = postfix_func;
+               ks_mutex_unlock(callbacks->lock);
+        s = KS_STATUS_SUCCESS;
+    }
+
+    ks_hash_write_unlock(g_handle->namespace_hash);
+    return  s;
+}
+
+KS_DECLARE(void) blade_rpc_remove_namespace(char* namespace)
+{
+
+    ks_hash_write_lock(g_handle->namespace_hash);
+
+       blade_rpc_namespace_t *n =  ks_hash_search(g_handle->namespace_hash, namespace, KS_UNLOCKED);
+
+       ks_hash_iterator_t* it = ks_hash_first(n->method_hash, KS_HASH_FLAG_RWLOCK);
+
+       while (it) {
+
+               const void *key; 
+               void *value;
+               ks_ssize_t len = strlen(key);
+
+               ks_hash_this(it, &key, &len, &value);
+               blade_rpc_callbackpair_t *callbacks = (blade_rpc_callbackpair_t *)value;
+
+               ks_mutex_lock(callbacks->lock);
+
+               if (callbacks->custom) {
+                       ks_pool_free(g_handle->pool, callbacks->custom);
+               }
+
+               it = ks_hash_next(&it);
+               ks_hash_remove(n->method_hash, (void *)key);
+       }               
+
+       ks_hash_write_unlock(g_handle->namespace_hash);
+
+       return;
+}
+
+
+/*
+ * template functions
+ *
+ */
+
+KS_DECLARE(ks_status_t) blade_rpc_declare_template(char* templatename, const char* version)
+{
+
+    /* find/insert to namespace hash as needed */
+    ks_hash_write_lock(g_handle->template_hash);
+    blade_rpc_namespace_t *n =  ks_hash_search(g_handle->template_hash, templatename, KS_UNLOCKED);
+    if (n == NULL) {
+        n = ks_pool_alloc(g_handle->pool, sizeof (blade_rpc_namespace_t) + strlen(templatename) + 1);
+        strncpy(n->name, templatename, KS_RPCMESSAGE_NAMESPACE_LENGTH);
+               strncpy(n->version, version, KS_RPCMESSAGE_VERSION_LENGTH);
+        ks_hash_insert(g_handle->template_hash, n->name, n);
+    }
+    ks_hash_write_unlock(g_handle->template_hash);
+
+    ks_log(KS_LOG_DEBUG, "Declaring application template namespace %s, version %s", templatename, version);
+
+    return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t)blade_rpc_register_template_function(char *name,
+                                                char *command,
+                                                jrpc_func_t func,
+                                                jrpc_resp_func_t respfunc)
+{
+    if (!func && !respfunc) {
+        return KS_STATUS_FAIL;
+    }
+
+    int lkey = strlen(command)+1;
+
+    if (lkey < 16) {
+        lkey = 16;
+    }
+
+    ks_hash_read_lock(g_handle->template_hash);    /* lock template hash */
+
+    blade_rpc_namespace_t *n =  ks_hash_search(g_handle->template_hash, name, KS_UNLOCKED);
+
+    if (n == NULL) {
+        ks_hash_read_unlock(g_handle->template_hash);
+        ks_log(KS_LOG_ERROR, "Unable to find %s template\n", name);
+        return KS_STATUS_FAIL;
+    }
+
+    blade_rpc_callbackpair_t* callbacks = blade_rpc_find_template_locked(name, command);
+
+    /* just ignore attempt to re register callbacks */
+    /* as the template may already be in use leading to confusion */
+
+    if (callbacks != NULL) {
+        ks_mutex_unlock(callbacks->lock);
+        ks_hash_read_unlock(g_handle->template_hash);
+        ks_log(KS_LOG_ERROR, "Callbacks already registered for %s template\n", name);
+        return KS_STATUS_FAIL;
+    }
+
+    callbacks =
+            (blade_rpc_callbackpair_t*)ks_pool_alloc(g_handle->pool, lkey + sizeof(blade_rpc_callbackpair_t));
+
+    strcpy(callbacks->command, command);
+    callbacks->command_length = lkey;
+    callbacks->request_func   = func;
+    callbacks->response_func  = respfunc;
+
+    ks_mutex_create(&callbacks->lock, KS_MUTEX_FLAG_DEFAULT, g_handle->pool);
+
+    ks_hash_write_lock(n->method_hash);             /* lock method hash */
+
+    ks_hash_insert(n->method_hash, callbacks->command, (void *) callbacks);
+
+    ks_hash_write_unlock(n->method_hash);           /* unlock method hash */
+    ks_hash_read_unlock(g_handle->template_hash);  /* unlock namespace hash */
+
+    ks_log(KS_LOG_DEBUG, "Template message %s %s registered\n", name, command);
+
+    return KS_STATUS_SUCCESS;
+
+}
+
+KS_DECLARE(ks_status_t)blade_rpc_inherit_template(char *namespace, char* template)
+{
+       ks_hash_read_lock(g_handle->template_hash);
+
+       char tkey[KS_RPCMESSAGE_NAMESPACE_LENGTH+1];
+       memset(tkey, 0, sizeof(tkey));
+       strcpy(tkey, template);
+
+    char nskey[KS_RPCMESSAGE_NAMESPACE_LENGTH+1];
+    memset(nskey, 0, sizeof(tkey));
+    strcpy(nskey, namespace);
+
+       blade_rpc_namespace_t *n =  ks_hash_search(g_handle->template_hash, tkey, KS_UNLOCKED);
+
+       if (!n) {
+               ks_hash_read_unlock(g_handle->template_hash);
+               ks_log(KS_LOG_ERROR, "Unable to locate template %s\n", template);
+               return KS_STATUS_FAIL;
+       }
+
+       ks_hash_read_lock(g_handle->namespace_hash);
+
+       blade_rpc_namespace_t *ns =  ks_hash_search(g_handle->namespace_hash, nskey, KS_UNLOCKED);
+
+    if (!ns) {
+        ks_hash_read_unlock(g_handle->template_hash);
+               ks_hash_read_unlock(g_handle->namespace_hash);
+        ks_log(KS_LOG_ERROR, "Unable to locate namespace %s\n", namespace);
+        return KS_STATUS_FAIL;
+    }
+
+       ks_hash_write_lock(ns->method_hash);
+
+    ks_hash_iterator_t* it = ks_hash_first(n->method_hash, KS_HASH_FLAG_RWLOCK);
+
+       ks_hash_iterator_t* itfirst = it;
+
+       /* first check that there are no name conflicts */
+    while (it) {
+
+        const void *key;
+        void *value;
+        ks_ssize_t len = strlen(key);
+
+        ks_hash_this(it, &key, &len, &value);
+        blade_rpc_callbackpair_t *t_callback = (blade_rpc_callbackpair_t *)value;
+
+        ks_mutex_lock(t_callback->lock);
+
+        char fqcommand[KS_RPCMESSAGE_FQCOMMAND_LENGTH+1];
+        blade_rpc_make_fqcommand(namespace, t_callback->command, fqcommand);
+        blade_rpc_callbackpair_t *ns_callbacks = ks_hash_search(ns->method_hash, fqcommand, KS_UNLOCKED);
+
+               if (ns_callbacks) {   /* if something already registered for this function kick the entire inherit */
+                       ks_hash_read_unlock(g_handle->template_hash);
+                       ks_hash_read_unlock(g_handle->namespace_hash);
+                       ks_hash_read_unlock(ns->method_hash);
+                       ks_mutex_unlock(t_callback->lock);
+                       ks_log(KS_LOG_ERROR, "Implementing template %s in namespace %s rejected. Command %s is ambiguous\n", 
+                                                                                       template, namespace, t_callback->command);
+                       return KS_STATUS_FAIL;
+               }
+
+               ks_mutex_unlock(t_callback->lock);
+
+        it = ks_hash_next(&it);
+    }
+
+       /* ok - if we have got this far then the inherit is problem free */
+
+       it = itfirst;
+
+       while (it) {
+
+        const void *key;
+        void *value;
+        ks_ssize_t len = strlen(key);
+
+        ks_hash_this(it, &key, &len, &value);
+        blade_rpc_callbackpair_t *t_callback = (blade_rpc_callbackpair_t *)value;
+
+        ks_mutex_lock(t_callback->lock);
+
+               int lkey = t_callback->command_length;
+
+               blade_rpc_callbackpair_t *callbacks =
+                               (blade_rpc_callbackpair_t*)ks_pool_alloc(g_handle->pool, lkey + sizeof(blade_rpc_callbackpair_t));
+
+               strcpy(callbacks->command, t_callback->command);
+               callbacks->command_length = lkey;
+               callbacks->request_func   = t_callback->request_func;
+               callbacks->response_func  = t_callback->response_func;
+               ks_mutex_create(&callbacks->lock, KS_MUTEX_FLAG_DEFAULT, g_handle->pool);
+
+               ks_hash_insert(ns->method_hash, callbacks->command, (void *) callbacks);                
+
+               ks_mutex_unlock(t_callback->lock);
+
+               it = ks_hash_next(&it);
+       }
+       
+
+       ks_hash_write_lock(ns->method_hash);
+
+       ks_hash_read_unlock(g_handle->namespace_hash);
+       ks_hash_read_unlock(g_handle->template_hash);
+
+       return KS_STATUS_SUCCESS;
+}
+               
+
+
+/*
+ * send message
+ * ------------
+ */
+
+KS_DECLARE(ks_status_t) blade_rpc_write_data(char *sessionid, char* data, uint32_t size)
+{
+
+       ks_status_t s = KS_STATUS_FAIL;
+
+    // convert to json
+       cJSON *msg = cJSON_Parse(data);
+
+       if (msg) {
+
+               // ks_status_t blade_peer_message_push(blade_peer_t *peer, void *data, int32_t data_length);
+
+               s = KS_STATUS_SUCCESS;
+       }
+       else {
+               ks_log(KS_LOG_ERROR, "Unable to format outbound message\n");
+       }
+       
+
+    // ks_rpc_write_json
+       // ks_status_t blade_peer_message_push(blade_peer_t *peer, void *data, int32_t data_length);
+       return s;
+}
+
+
+KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json)
+{
+    // just push the messages onto the communication manager
+    // synchronization etc, taken care of by the transport api'
+       char *data = cJSON_PrintUnformatted(json);
+       if (data) {
+               ks_log(KS_LOG_DEBUG, "%s\n", data);
+           //return blade_rpc_write_data(sessionid, data, strlen(data));
+       }
+       ks_log(KS_LOG_ERROR, "Unable to parse json\n");
+       return KS_STATUS_FAIL;
+}
+
+
+
+
+/*
+ * Transport layer callbacks follow below
+ *
+*/
+
+
+
+
+/*
+ * rpc message processing
+ */
+static ks_status_t blade_rpc_process_jsonmessage_all(cJSON *request, cJSON **responseP)
+{
+       const char *fqcommand = cJSON_GetObjectCstr(request, "method");
+       cJSON *error = NULL;
+       cJSON *response = NULL;
+       *responseP = NULL;
+
+       if (!fqcommand) {
+               error = cJSON_CreateObject();
+               cJSON_AddStringToObject(error, "errormessage", "Command not specified");
+        ks_rpcmessage_create_request("rpcprotocol", "unknowncommand", NULL, NULL,  &error, responseP);
+        return KS_STATUS_FAIL;
+       }
+
+
+       char namespace[KS_RPCMESSAGE_NAMESPACE_LENGTH];
+       char command[KS_RPCMESSAGE_COMMAND_LENGTH];
+
+    blade_rpc_parse_fqcommand(fqcommand, namespace, command);
+    blade_rpc_callbackpair_t* callbacks = blade_rpc_find_callbacks_locked(namespace, command);
+
+    if (!callbacks) {
+               error = cJSON_CreateObject();
+               cJSON_AddStringToObject(error, "errormessage", "Command not supported");
+        ks_rpcmessage_create_response(request, &error, responseP);
+        return  KS_STATUS_FAIL;
+    }
+
+       //todo - add more checks ? 
+
+       ks_bool_t isrequest = ks_rpcmessage_isrequest(request);
+
+       ks_status_t s = KS_STATUS_SUCCESS;
+       
+       if (isrequest && callbacks->request_func) {
+
+               if (callbacks->custom && callbacks->custom->prefix_request_func) {
+                       s = callbacks->custom->prefix_request_func(request);
+               }
+
+        if (s == KS_STATUS_SUCCESS) {
+                       s =  callbacks->request_func(request, responseP);
+               }
+
+               if (s == KS_STATUS_SUCCESS && callbacks->custom && callbacks->custom->postfix_request_func) {
+                       s =  callbacks->custom->postfix_request_func(request, responseP);
+               }
+
+               ks_mutex_unlock(callbacks->lock);
+
+               return s;
+       }
+    else if (!isrequest && callbacks->response_func) {
+
+        if (callbacks->custom && callbacks->custom->prefix_response_func) {
+            s = callbacks->custom->prefix_response_func(response);
+               }
+
+               if (s == KS_STATUS_SUCCESS) {
+                       s =  callbacks->response_func(response);
+               }
+
+               if (s == KS_STATUS_SUCCESS && callbacks->custom &&  callbacks->custom->postfix_response_func) {
+                       s =  callbacks->custom->postfix_response_func(response);
+               }
+
+               ks_mutex_unlock(callbacks->lock);
+
+               return s;
+       }
+
+       ks_log(KS_LOG_ERROR, "Unable to find message handler for %s\n", command);
+       
+       return KS_STATUS_FAIL;
+}
+
+/*
+ *
+*/
+KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request, cJSON **responseP)
+{
+       ks_status_t respstatus = blade_rpc_process_jsonmessage_all(request, responseP);
+       cJSON *response = *responseP;
+       if (respstatus == KS_STATUS_SUCCESS && response != NULL) {
+               blade_rpc_write_json(response);
+       }
+       return respstatus;
+}
+
+KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data,
+                                                        ks_size_t size)
+{
+       
+       cJSON *json = cJSON_Parse((const char*)data);
+       if (json != NULL) {
+               ks_log( KS_LOG_ERROR, "Unable to parse message\n");
+               return KS_STATUS_FAIL;
+       }
+
+       /* deal with rpc message */
+       if (ks_rpcmessage_isrpc(json)) {
+               cJSON *response = NULL;
+               ks_status_t respstatus = blade_rpc_process_jsonmessage_all(json, &response);
+               if (respstatus == KS_STATUS_SUCCESS && response != NULL) {
+                       blade_rpc_write_json(response);
+                       cJSON_Delete(response);
+               }
+               return respstatus;
+       }
+       
+       ks_log(KS_LOG_ERROR, "Unable to identify message type\n");
+       
+       return KS_STATUS_FAIL;
+}
+
+KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message)
+{
+       uint8_t* data = NULL;
+       ks_size_t size = 0;
+
+       blade_message_get(message, (void **)&data, &size);
+
+       if (data && size>0) {
+               ks_status_t s = blade_rpc_process_data(data, size);
+               blade_message_discard(&message);
+               return s;
+       } 
+       
+       ks_log(KS_LOG_ERROR, "Message read failed\n");
+       return KS_STATUS_FAIL;
+
+}
+
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
+
diff --git a/libs/libblade/src/include/blade_rpcproto.h b/libs/libblade/src/include/blade_rpcproto.h
new file mode 100644 (file)
index 0000000..d6a37f3
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2017, FreeSWITCH LLC
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _BLADE_RPCPROTO_H_
+#define _BLADE_RPCPROTO_H_
+
+#include <ks_rpcmessage.h>
+#include <blade_types.h>
+
+// temp typedefs to get compile going 
+//typedef struct blade_peer_s  blade_peer_t;
+//typedef struct blade_message_s blade_message_t;
+//typedef struct blade_event_s blade_event_t;
+
+#define KS_RPCMESSAGE_NAMESPACE_LENGTH 16
+#define KS_RPCMESSAGE_COMMAND_LENGTH  238
+#define KS_RPCMESSAGE_FQCOMMAND_LENGTH  (KS_RPCMESSAGE_NAMESPACE_LENGTH+KS_RPCMESSAGE_COMMAND_LENGTH+1)
+#define KS_RPCMESSAGE_VERSION_LENGTH 9
+
+
+typedef  ks_status_t (*jrpc_func_t)         (cJSON *request, cJSON **responseP);
+typedef  ks_status_t (*jrpc_prefix_func_t)  (cJSON *request);
+typedef  ks_status_t (*jrpc_postfix_func_t) (cJSON *response,  cJSON **responseP);
+
+typedef  ks_status_t (*jrpc_resp_func_t)         (cJSON *response);
+typedef  ks_status_t (*jrpc_prefix_resp_func_t)  (cJSON *response);
+typedef  ks_status_t (*jrpc_postfix_resp_func_t) (cJSON *response);
+
+
+/*
+ *  setup
+ *  -----
+ */
+
+KS_DECLARE(ks_status_t) blade_rpc_init(ks_pool_t *pool);
+KS_DECLARE(ks_status_t) blade_rpc_runprocess();
+
+
+/* 
+ *  namespace and call back registration 
+ *  ------------------------------------
+ */
+KS_DECLARE(ks_status_t) blade_rpc_declare_namespace(char* namespace, const char* version);
+KS_DECLARE(ks_status_t) blade_rpc_register_function(char* namespace, 
+                                                                                                               char *command,
+                                                                                                               jrpc_func_t func,
+                                                                                                               jrpc_resp_func_t respfunc);
+KS_DECLARE(ks_status_t) blade_rpc_register_prefix_request_function(char* namespace,   
+                                                                                                               char *command,
+                                                                                                               jrpc_prefix_func_t prefix_func,
+                                                                                                               jrpc_postfix_func_t postfix_func);
+KS_DECLARE(ks_status_t) blade_rpc_register_prefix_response_function(char *namespace,   
+                                                                                                               char *command,
+                                                                                                               jrpc_prefix_resp_func_t prefix_func,
+                                                                                                               jrpc_postfix_resp_func_t postfix_func);
+KS_DECLARE(void) blade_rpc_remove_namespace(char* namespace);
+
+
+/*
+ * peer create/destroy
+ * -------------------
+ */
+KS_DECLARE(ks_status_t) blade_rpc_onconnect(ks_pool_t *pool, blade_peer_t* peer);
+KS_DECLARE(ks_status_t) blade_rpc_disconnect(blade_peer_t* peer);
+
+/* 
+ * send message
+ * ------------
+ */
+KS_DECLARE(ks_status_t) blade_rpc_write(char *sessionid, char* data, uint32_t size);
+KS_DECLARE(ks_status_t) blade_rpc_write_json(cJSON* json);
+
+
+/*
+ * process inbound message
+ * -----------------------
+ */
+KS_DECLARE(ks_status_t) blade_rpc_process_blademessage(blade_message_t *message);
+KS_DECLARE(ks_status_t) blade_rpc_process_data(const uint8_t *data, ks_size_t size);
+
+KS_DECLARE(ks_status_t) blade_rpc_process_jsonmessage(cJSON *request, cJSON **responseP);
+
+
+#endif
+