]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-10167: Preliminary version of blade.subscribe is implemented
authorShane Bryldt <astaelan@gmail.com>
Sat, 10 Jun 2017 03:34:02 +0000 (21:34 -0600)
committerShane Bryldt <astaelan@gmail.com>
Sat, 10 Jun 2017 03:34:02 +0000 (21:34 -0600)
libs/libblade/libblade.vcxproj
libs/libblade/libblade.vcxproj.filters
libs/libblade/src/blade_session.c
libs/libblade/src/blade_stack.c
libs/libblade/src/blade_subscription.c [new file with mode: 0644]
libs/libblade/src/include/blade.h
libs/libblade/src/include/blade_stack.h
libs/libblade/src/include/blade_subscription.h [new file with mode: 0644]
libs/libblade/src/include/blade_types.h
libs/libblade/test/bladec.c

index b559f7f670ceaf698dfbdbca8d729e58ad9ff35b..1806111831a70cf8345738b9154ca6409bc3aa5a 100644 (file)
     <ClCompile Include="src\blade_identity.c" />
     <ClCompile Include="src\blade_rpc.c" />
     <ClCompile Include="src\blade_protocol.c" />
+    <ClCompile Include="src\blade_subscription.c" />
     <ClCompile Include="src\blade_transport_wss.c" />
     <ClCompile Include="src\blade_session.c" />
     <ClCompile Include="src\blade_stack.c" />
     <ClInclude Include="src\include\blade_identity.h" />
     <ClInclude Include="src\include\blade_rpc.h" />
     <ClInclude Include="src\include\blade_protocol.h" />
+    <ClInclude Include="src\include\blade_subscription.h" />
     <ClInclude Include="src\include\blade_transport_wss.h" />
     <ClInclude Include="src\include\blade_session.h" />
     <ClInclude Include="src\include\blade_stack.h" />
index 429764f0667842a523fa091dd4526580ac3ec902..17d413b017f18f43c9d7d40b735ea2254d83561f 100644 (file)
@@ -45,6 +45,9 @@
     <ClCompile Include="src\blade_rpc.c">
       <Filter>Source Files</Filter>
     </ClCompile>
+    <ClCompile Include="src\blade_subscription.c">
+      <Filter>Source Files</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="src\include\unqlite.h">
@@ -80,5 +83,8 @@
     <ClInclude Include="src\include\blade_rpc.h">
       <Filter>Header Files</Filter>
     </ClInclude>
+    <ClInclude Include="src\include\blade_subscription.h">
+      <Filter>Header Files</Filter>
+    </ClInclude>
   </ItemGroup>
 </Project>
\ No newline at end of file
index ed7b3e45a29d3b799d1f931a6e9193882878e8d9..f865497916d644f8fb2f65c8a22d2186878ff48c 100644 (file)
@@ -280,6 +280,7 @@ KS_DECLARE(ks_status_t) blade_session_route_remove(blade_session_t *bs, const ch
        return KS_STATUS_SUCCESS;
 }
 
+
 KS_DECLARE(cJSON *) blade_session_properties_get(blade_session_t *bs)
 {
        ks_assert(bs);
index c50f4f8e60a75c2fce84daf06bdb40f2c509f0c1..dbfff34eb130294b51ea8e7c0932e2b3e283dcf7 100644 (file)
@@ -65,6 +65,9 @@ struct blade_handle_s {
 
        ks_hash_t *protocolrpcs; // registered blade_rpc_t, for locally processing protocol messages, keyed by the rpc method
 
+       ks_hash_t *subscriptions; // registered blade_subscription_t, subscribers may include the local node
+       ks_hash_t *subscriptions_cleanup; // cleanup for subscriptions, keyed by the downstream subscriber nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm/event" keys to remove each nodeid from as a subscriber during cleanup
+
        ks_hash_t *connections; // active connections keyed by connection id
 
        ks_hash_t *sessions; // active sessions keyed by session id (which comes from the nodeid of the downstream side of the session, thus an upstream session is keyed under the local_nodeid)
@@ -73,14 +76,7 @@ struct blade_handle_s {
 
        // @note everything below this point is exclusively for the master node
 
-       // @todo need to track the details from blade.publish, a protocol may be published under multiple realms, and each protocol published to a realm may have multiple target providers
        // @todo how does "exclusive" play into the providers, does "exclusive" mean only one provider can exist for a given protocol and realm?
-       // for now, ignore exclusive and multiple providers, key by "protocol" in a hash, and use a blade_protocol_t to represent a protocol in the context of being published so it can be located by other nodes
-       // each blade_protocol_t will contain the "protocol", common method/namespace/schema data, and a hash keyed by the "realm", with a value of an object of type blade_protocol_realm_t
-       // each blade_protocol_realm_t will contain the "realm" and a list of publisher nodeid's, any of which can be chosen at random to use the protocol within the given realm (does "exclusive" only limit this to 1 provider per realm?)
-       // @todo protocols must be cleaned up when routes are removed due to session terminations, should incorporate a faster way to lookup which protocols are tied to a given nodeid for efficient removal
-       // create blade_protocol_method_t to represent a method that is executed with blade.execute, and is part of a protocol made available through blade.publish, registered locally by the protocol and method name (protocol.methodname?),
-       // with a callback handler which should also have the realm available when executed so a single provider can easily provide a protocol for multiple realms with the same method callbacks
        ks_hash_t *protocols; // master only: protocols that have been published with blade.publish, and the details to locate a protocol provider with blade.locate
        ks_hash_t *protocols_cleanup; // master only: keyed by the nodeid, each value is a hash_t* of which contains string keys matching the "protocol@realm" keys to remove each nodeid from as a provider during cleanup
 
@@ -91,6 +87,7 @@ ks_bool_t blade_protocol_register_request_handler(blade_rpc_request_t *brpcreq,
 ks_bool_t blade_protocol_publish_request_handler(blade_rpc_request_t *brpcreq, void *data);
 ks_bool_t blade_protocol_locate_request_handler(blade_rpc_request_t *brpcreq, void *data);
 ks_bool_t blade_protocol_execute_request_handler(blade_rpc_request_t *brpcreq, void *data);
+ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data);
 
 
 typedef struct blade_handle_session_state_callback_registration_s blade_handle_session_state_callback_registration_t;
@@ -237,6 +234,12 @@ KS_DECLARE(ks_status_t) blade_handle_create(blade_handle_t **bhP)
        ks_hash_create(&bh->protocolrpcs, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
        ks_assert(bh->protocolrpcs);
 
+       ks_hash_create(&bh->subscriptions, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool);
+       ks_assert(bh->subscriptions);
+
+       ks_hash_create(&bh->subscriptions_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY | KS_HASH_FLAG_FREE_VALUE, bh->pool);
+       ks_assert(bh->subscriptions_cleanup);
+
        ks_hash_create(&bh->connections, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, bh->pool);
        ks_assert(bh->connections);
 
@@ -353,6 +356,9 @@ KS_DECLARE(ks_status_t) blade_handle_startup(blade_handle_t *bh, config_setting_
        blade_rpc_create(&brpc, bh, "blade.execute", NULL, NULL, blade_protocol_execute_request_handler, NULL);
        blade_handle_corerpc_register(brpc);
 
+       blade_rpc_create(&brpc, bh, "blade.subscribe", NULL, NULL, blade_protocol_subscribe_request_handler, NULL);
+       blade_handle_corerpc_register(brpc);
+
        // register internal transport for secure websockets
        blade_transport_wss_create(&bt, bh);
        ks_assert(bt);
@@ -868,6 +874,104 @@ KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, co
        return brpc;
 }
 
+KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
+{
+       char *key = NULL;
+       blade_subscription_t *bsub = NULL;
+       ks_hash_t *bsub_cleanup = NULL;
+       ks_bool_t propagate = KS_FALSE;
+
+       ks_assert(bh);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(nodeid);
+
+       key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
+
+       ks_hash_write_lock(bh->subscriptions);
+
+       bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)key, KS_UNLOCKED);
+
+       if (!bsub) {
+               blade_subscription_create(&bsub, bh->pool, event, protocol, realm);
+               ks_assert(bsub);
+
+               ks_hash_insert(bh->subscriptions, (void *)ks_pstrdup(bh->pool, key), bsub);
+               propagate = KS_TRUE;
+       }
+
+       bsub_cleanup = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)nodeid, KS_UNLOCKED);
+       if (!bsub_cleanup) {
+               ks_hash_create(&bsub_cleanup, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bh->pool);
+               ks_assert(bsub_cleanup);
+
+               ks_log(KS_LOG_DEBUG, "Subscription (%s) added\n", key);
+               ks_hash_insert(bh->subscriptions_cleanup, (void *)ks_pstrdup(bh->pool, nodeid), bsub_cleanup);
+       }
+       ks_hash_insert(bsub_cleanup, (void *)ks_pstrdup(bh->pool, key), (void *)KS_TRUE);
+
+       blade_subscription_subscribers_add(bsub, nodeid);
+
+       ks_hash_write_unlock(bh->subscriptions);
+
+       ks_log(KS_LOG_DEBUG, "Subscription (%s) subscriber (%s) added\n", key, nodeid);
+
+       ks_pool_free(bh->pool, &key);
+
+       if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_FALSE, NULL, NULL);
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
+{
+       char *key = NULL;
+       blade_subscription_t *bsub = NULL;
+       ks_hash_t *bsub_cleanup = NULL;
+       ks_bool_t propagate = KS_FALSE;
+
+       ks_assert(bh);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+       ks_assert(nodeid);
+
+       key = ks_psprintf(bh->pool, "%s@%s/%s", protocol, realm, event);
+
+       ks_hash_write_lock(bh->subscriptions);
+
+       bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, (void *)key, KS_UNLOCKED);
+
+       if (bsub) {
+               bsub_cleanup = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)nodeid, KS_UNLOCKED);
+               ks_assert(bsub_cleanup);
+               ks_hash_remove(bsub_cleanup, key);
+
+               if (ks_hash_count(bsub_cleanup) == 0) {
+                       ks_hash_remove(bh->subscriptions_cleanup, (void *)nodeid);
+               }
+
+               ks_log(KS_LOG_DEBUG, "Subscription (%s) subscriber (%s) removed\n", key, nodeid);
+               blade_subscription_subscribers_remove(bsub, nodeid);
+
+               if (ks_hash_count(blade_subscription_subscribers_get(bsub)) == 0) {
+                       ks_log(KS_LOG_DEBUG, "Subscription (%s) removed\n", key);
+                       ks_hash_remove(bh->subscriptions, (void *)key);
+                       propagate = KS_TRUE;
+               }
+       }
+
+       ks_hash_write_unlock(bh->subscriptions);
+
+       ks_pool_free(bh->pool, &key);
+
+       if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
+
+       return KS_STATUS_SUCCESS;
+}
+
+
 
 KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id)
 {
@@ -975,20 +1079,63 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
 {
        ks_status_t ret = KS_STATUS_SUCCESS;
        blade_handle_t *bh = NULL;
+       ks_pool_t *pool = NULL;
        const char *id = NULL;
        ks_hash_iterator_t *it = NULL;
        ks_bool_t upstream = KS_FALSE;
+       ks_bool_t unsubbed = KS_FALSE;
 
        ks_assert(bs);
 
        bh = blade_session_handle_get(bs);
        ks_assert(bh);
 
+       pool = blade_handle_pool_get(bh);
+       ks_assert(pool);
+
        blade_session_write_lock(bs, KS_TRUE);
 
        id = blade_session_id_get(bs);
        ks_assert(id);
 
+       // @todo this cleanup is a bit messy, move to using the combined key rather than passing around all 3 parts would make this cleaner
+       while (!unsubbed) {
+               ks_hash_t *subscriptions = NULL;
+               const char *event = NULL;
+               const char *protocol = NULL;
+               const char *realm = NULL;
+
+               ks_hash_read_lock(bh->subscriptions);
+               subscriptions = (ks_hash_t *)ks_hash_search(bh->subscriptions_cleanup, (void *)id, KS_UNLOCKED);
+               if (!subscriptions) unsubbed = KS_TRUE;
+               else {
+                       void *key = NULL;
+                       void *value = NULL;
+                       blade_subscription_t *bsub = NULL;
+
+                       it = ks_hash_first(subscriptions, KS_UNLOCKED);
+                       ks_assert(it);
+
+                       ks_hash_this(it, (const void **)&key, NULL, &value);
+
+                       bsub = (blade_subscription_t *)ks_hash_search(bh->subscriptions, key, KS_UNLOCKED);
+                       ks_assert(bsub);
+
+                       // @note allocate these to avoid lifecycle issues when the last subscriber is removed causing the subscription to be removed
+                       event = ks_pstrdup(bh->pool, blade_subscription_event_get(bsub));
+                       protocol = ks_pstrdup(bh->pool, blade_subscription_protocol_get(bsub));
+                       realm = ks_pstrdup(bh->pool, blade_subscription_realm_get(bsub));
+               }
+               ks_hash_read_unlock(bh->subscriptions);
+
+               if (!unsubbed) {
+                       blade_handle_subscriber_remove(bh, event, protocol, realm, id);
+                       ks_pool_free(bh->pool, &event);
+                       ks_pool_free(bh->pool, &protocol);
+                       ks_pool_free(bh->pool, &realm);
+               }
+       }
+
        ks_hash_write_lock(bh->sessions);
        if (ks_hash_remove(bh->sessions, (void *)id) == NULL) ret = KS_STATUS_FAIL;
 
@@ -1011,8 +1158,6 @@ KS_DECLARE(ks_status_t) blade_handle_sessions_remove(blade_session_t *bs)
 
        blade_session_write_unlock(bs);
 
-
-
        return ret;
 }
 
@@ -1821,6 +1966,167 @@ KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcr
        blade_session_read_unlock(bs);
 }
 
+
+// blade.subscribe request generator
+KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_session_t *bs = NULL;
+
+       ks_assert(bh);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+
+       if (!(bs = blade_handle_sessions_upstream(bh))) {
+               ret = KS_STATUS_DISCONNECTED;
+               goto done;
+       }
+
+       if (remove) {
+               blade_handle_subscriber_remove(bh, event, protocol, realm, bh->local_nodeid);
+       } else {
+               blade_handle_subscriber_add(bh, event, protocol, realm, bh->local_nodeid);
+       }
+
+done:
+       if (bs) blade_session_read_unlock(bs);
+
+       return ret;
+}
+
+KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+{
+       ks_status_t ret = KS_STATUS_SUCCESS;
+       blade_session_t *bs = NULL;
+       ks_pool_t *pool = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+
+       ks_assert(bh);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+
+       if (!(bs = blade_handle_sessions_upstream(bh))) {
+               ret = KS_STATUS_DISCONNECTED;
+               goto done;
+       }
+
+       pool = blade_handle_pool_get(bh);
+       ks_assert(pool);
+
+       blade_rpc_request_raw_create(pool, &req, &req_params, NULL, "blade.subscribe");
+
+       cJSON_AddStringToObject(req_params, "event", event);
+       cJSON_AddStringToObject(req_params, "protocol", protocol);
+       cJSON_AddStringToObject(req_params, "realm", realm);
+       if (remove) cJSON_AddTrueToObject(req_params, "remove");
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request started\n", blade_session_id_get(bs));
+
+       ret = blade_session_send(bs, req, callback, data);
+
+done:
+       if (req) cJSON_Delete(req);
+       if (bs) blade_session_read_unlock(bs);
+
+       return ret;
+}
+
+// blade.subscribe request handler
+ks_bool_t blade_protocol_subscribe_request_handler(blade_rpc_request_t *brpcreq, void *data)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+       ks_pool_t *pool = NULL;
+       cJSON *req = NULL;
+       cJSON *req_params = NULL;
+       const char *req_params_event = NULL;
+       const char *req_params_protocol = NULL;
+       const char *req_params_realm = NULL;
+       cJSON *req_params_remove = NULL;
+       ks_bool_t remove = KS_FALSE;
+       cJSON *res = NULL;
+       cJSON *res_result = NULL;
+
+       ks_assert(brpcreq);
+
+       bh = blade_rpc_request_handle_get(brpcreq);
+       ks_assert(bh);
+
+       pool = blade_handle_pool_get(bh);
+       ks_assert(pool);
+
+       bs = blade_handle_sessions_lookup(bh, blade_rpc_request_sessionid_get(brpcreq));
+       ks_assert(bs);
+
+       req = blade_rpc_request_message_get(brpcreq);
+       ks_assert(req);
+
+       req_params = cJSON_GetObjectItem(req, "params");
+       if (!req_params) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'params' object\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params object");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_event = cJSON_GetObjectCstr(req_params, "event");
+       if (!req_params_event) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'event'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params event");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_protocol = cJSON_GetObjectCstr(req_params, "protocol");
+       if (!req_params_protocol) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'protocol'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params protocol");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_realm = cJSON_GetObjectCstr(req_params, "realm");
+       if (!req_params_realm) {
+               ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request missing 'realm'\n", blade_session_id_get(bs));
+               blade_rpc_error_raw_create(&res, NULL, blade_rpc_request_messageid_get(brpcreq), -32602, "Missing params realm");
+               blade_session_send(bs, res, NULL, NULL);
+               goto done;
+       }
+
+       req_params_remove = cJSON_GetObjectItem(req_params, "remove");
+       remove = req_params_remove && req_params_remove->type == cJSON_True;
+
+       // @todo confirm the realm is permitted for the session, this gets complicated with subdomains, skipping for now
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
+
+       if (remove) {
+               blade_handle_subscriber_remove(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+       } else {
+               blade_handle_subscriber_add(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+       }
+
+       // build the actual response finally
+       blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
+
+       cJSON_AddStringToObject(res_result, "event", req_params_event);
+       cJSON_AddStringToObject(res_result, "protocol", req_params_protocol);
+       cJSON_AddStringToObject(res_result, "realm", req_params_realm);
+
+       // request was just received on a session that is already read locked, so we can assume the response goes back on the same session without further lookup
+       blade_session_send(bs, res, NULL, NULL);
+
+done:
+
+       if (res) cJSON_Delete(res);
+       if (bs) blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
+
 /* For Emacs:
  * Local Variables:
  * mode:c
diff --git a/libs/libblade/src/blade_subscription.c b/libs/libblade/src/blade_subscription.c
new file mode 100644 (file)
index 0000000..b17e351
--- /dev/null
@@ -0,0 +1,172 @@
+/*
+ * Copyright (c) 2017, Shane Bryldt
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "blade.h"
+
+struct blade_subscription_s {
+       ks_pool_t *pool;
+
+       const char *event;
+       const char *protocol;
+       const char *realm;
+       ks_hash_t *subscribers;
+};
+
+
+static void blade_subscription_cleanup(ks_pool_t *pool, void *ptr, void *arg, ks_pool_cleanup_action_t action, ks_pool_cleanup_type_t type)
+{
+       blade_subscription_t *bsub = (blade_subscription_t *)ptr;
+
+       ks_assert(bsub);
+
+       switch (action) {
+       case KS_MPCL_ANNOUNCE:
+               break;
+       case KS_MPCL_TEARDOWN:
+               if (bsub->event) ks_pool_free(bsub->pool, &bsub->event);
+               if (bsub->protocol) ks_pool_free(bsub->pool, &bsub->protocol);
+               if (bsub->realm) ks_pool_free(bsub->pool, &bsub->subscribers);
+               if (bsub->subscribers) ks_hash_destroy(&bsub->subscribers);
+               break;
+       case KS_MPCL_DESTROY:
+               break;
+       }
+}
+
+KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm)
+{
+       blade_subscription_t *bsub = NULL;
+
+       ks_assert(bsubP);
+       ks_assert(pool);
+       ks_assert(event);
+       ks_assert(protocol);
+       ks_assert(realm);
+
+       bsub = ks_pool_alloc(pool, sizeof(blade_subscription_t));
+       bsub->pool = pool;
+       bsub->event = ks_pstrdup(pool, event);
+       bsub->protocol = ks_pstrdup(pool, protocol);
+       bsub->realm = ks_pstrdup(pool, realm);
+
+       ks_hash_create(&bsub->subscribers, KS_HASH_MODE_CASE_INSENSITIVE, KS_HASH_FLAG_NOLOCK | KS_HASH_FLAG_DUP_CHECK | KS_HASH_FLAG_FREE_KEY, bsub->pool);
+       ks_assert(bsub->subscribers);
+
+       ks_pool_set_cleanup(pool, bsub, NULL, blade_subscription_cleanup);
+
+       *bsubP = bsub;
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP)
+{
+       blade_subscription_t *bsub = NULL;
+       
+       ks_assert(bsubP);
+       ks_assert(*bsubP);
+
+       bsub = *bsubP;
+
+       ks_pool_free(bsub->pool, bsubP);
+
+       return KS_STATUS_SUCCESS;
+}
+
+KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub)
+{
+       ks_assert(bsub);
+
+       return bsub->event;
+
+}
+
+KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub)
+{
+       ks_assert(bsub);
+
+       return bsub->protocol;
+
+}
+
+KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub)
+{
+       ks_assert(bsub);
+
+       return bsub->realm;
+
+}
+
+KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub)
+{
+       ks_assert(bsub);
+
+       return bsub->subscribers;
+
+}
+
+KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid)
+{
+       char *key = NULL;
+
+       ks_assert(bsub);
+       ks_assert(nodeid);
+
+       key = ks_pstrdup(bsub->pool, nodeid);
+       ks_hash_insert(bsub->subscribers, (void *)key, (void *)KS_TRUE);
+
+       return KS_STATUS_SUCCESS;
+
+}
+
+KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid)
+{
+       ks_assert(bsub);
+       ks_assert(nodeid);
+
+       ks_hash_remove(bsub->subscribers, (void *)nodeid);
+
+       return KS_STATUS_SUCCESS;
+
+}
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index 6e889b6808ec6983cef2d705f1d27d8ab698c432..2bc848d79ce7272f7b8bc291e00f204b04e61949 100644 (file)
@@ -45,6 +45,7 @@
 #include "blade_connection.h"
 #include "blade_session.h"
 #include "blade_protocol.h"
+#include "blade_subscription.h"
 
 #include "blade_transport_wss.h"
 
index 46c449f25131db5b500ed33dca2aebc6ceb8b770..c460b8d839a230e3ea02cd31f0d0fecfee491e48 100644 (file)
@@ -78,6 +78,9 @@ KS_DECLARE(ks_status_t) blade_handle_protocolrpc_register(blade_rpc_t *brpc);
 KS_DECLARE(ks_status_t) blade_handle_protocolrpc_unregister(blade_rpc_t *brpc);
 KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, const char *method, const char *protocol, const char *realm);
 
+KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
+KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
+
 
 KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
 
@@ -106,6 +109,9 @@ KS_DECLARE(cJSON *) blade_protocol_execute_request_params_get(blade_rpc_request_
 KS_DECLARE(cJSON *) blade_protocol_execute_response_result_get(blade_rpc_response_t *brpcres);
 KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
 
+KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+
 KS_END_EXTERN_C
 
 #endif
diff --git a/libs/libblade/src/include/blade_subscription.h b/libs/libblade/src/include/blade_subscription.h
new file mode 100644 (file)
index 0000000..c6528aa
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2017, Shane Bryldt
+ * All rights reserved.
+ * 
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 
+ * * Neither the name of the original author; nor the names of any contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ * 
+ * 
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER
+ * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _BLADE_SUBSCRIPTION_H_
+#define _BLADE_SUBSCRIPTION_H_
+#include <blade.h>
+
+KS_BEGIN_EXTERN_C
+KS_DECLARE(ks_status_t) blade_subscription_create(blade_subscription_t **bsubP, ks_pool_t *pool, const char *event, const char *protocol, const char *realm);
+KS_DECLARE(ks_status_t) blade_subscription_destroy(blade_subscription_t **bsubP);
+KS_DECLARE(const char *) blade_subscription_event_get(blade_subscription_t *bsub);
+KS_DECLARE(const char *) blade_subscription_protocol_get(blade_subscription_t *bsub);
+KS_DECLARE(const char *) blade_subscription_realm_get(blade_subscription_t *bsub);
+KS_DECLARE(ks_hash_t *) blade_subscription_subscribers_get(blade_subscription_t *bsub);
+KS_DECLARE(ks_status_t) blade_subscription_subscribers_add(blade_subscription_t *bsub, const char *nodeid);
+KS_DECLARE(ks_status_t) blade_subscription_subscribers_remove(blade_subscription_t *bsub, const char *nodeid);
+KS_END_EXTERN_C
+
+#endif
+
+/* For Emacs:
+ * Local Variables:
+ * mode:c
+ * indent-tabs-mode:t
+ * tab-width:4
+ * c-basic-offset:4
+ * End:
+ * For VIM:
+ * vim:set softtabstop=4 shiftwidth=4 tabstop=4 noet:
+ */
index ed7433cd571db4e9c6fc05d25702c65db2ff2177..887d4f3e2a28289ece72b848823f71d43cd98f61 100644 (file)
@@ -49,8 +49,9 @@ typedef struct blade_connection_s blade_connection_t;
 typedef struct blade_session_s blade_session_t;
 typedef struct blade_session_callbacks_s blade_session_callbacks_t;
 typedef struct blade_protocol_s blade_protocol_t;
-typedef struct blade_protocol_realm_s blade_protocol_realm_t;
-typedef struct blade_protocol_method_s blade_protocol_method_t;
+typedef struct blade_subscription_s blade_subscription_t;
+//typedef struct blade_protocol_realm_s blade_protocol_realm_t;
+//typedef struct blade_protocol_method_s blade_protocol_method_t;
 
 
 typedef ks_bool_t (*blade_rpc_request_callback_t)(blade_rpc_request_t *brpcreq, void *data);
index b3b7c9dfd8908961b9b7be864184c285c5cde0ed..c2e9011e376f10b6211a88d1c33e3898afccb1e2 100644 (file)
@@ -17,10 +17,12 @@ struct command_def_s {
 
 void command_quit(blade_handle_t *bh, char *args);
 void command_execute(blade_handle_t *bh, char *args);
+void command_subscribe(blade_handle_t *bh, char *args);
 
 static const struct command_def_s command_defs[] = {
        { "quit", command_quit },
        { "execute", command_execute },
+       { "subscribe", command_subscribe },
 
        { NULL, NULL }
 };
@@ -109,6 +111,26 @@ ks_bool_t blade_locate_response_handler(blade_rpc_response_t *brpcres, void *dat
        return KS_FALSE;
 }
 
+ks_bool_t blade_subscribe_response_handler(blade_rpc_response_t *brpcres, void *data)
+{
+       blade_handle_t *bh = NULL;
+       blade_session_t *bs = NULL;
+
+       ks_assert(brpcres);
+
+       bh = blade_rpc_response_handle_get(brpcres);
+       ks_assert(bh);
+
+       bs = blade_handle_sessions_lookup(bh, blade_rpc_response_sessionid_get(brpcres));
+       ks_assert(bs);
+
+       ks_log(KS_LOG_DEBUG, "Session (%s) blade.subscribe response processing\n", blade_session_id_get(bs));
+
+       blade_session_read_unlock(bs);
+
+       return KS_FALSE;
+}
+
 int main(int argc, char **argv)
 {
        blade_handle_t *bh = NULL;
@@ -246,6 +268,14 @@ void command_execute(blade_handle_t *bh, char *args)
        blade_protocol_locate(bh, "test", "mydomain.com", blade_locate_response_handler, NULL);
 }
 
+void command_subscribe(blade_handle_t *bh, char *args)
+{
+       ks_assert(bh);
+       ks_assert(args);
+
+       blade_protocol_subscribe(bh, "test.event", "test", "mydomain.com", KS_FALSE, blade_subscribe_response_handler, NULL);
+}
+
 /* For Emacs:
 * Local Variables:
 * mode:c