return brpc;
}
-KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_add(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
ks_pool_free(bh->pool, &key);
- if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_FALSE, NULL, NULL);
+ if (bsubP) *bsubP = bsub;
- return KS_STATUS_SUCCESS;
+ return propagate;
}
-KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid)
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_remove(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid)
{
char *key = NULL;
blade_subscription_t *bsub = NULL;
ks_pool_free(bh->pool, &key);
- if (propagate) blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
+ if (bsubP) *bsubP = bsub;
- return KS_STATUS_SUCCESS;
+ return propagate;
}
ks_hash_read_unlock(bh->subscriptions);
if (!unsubbed) {
- blade_handle_subscriber_remove(bh, event, protocol, realm, id);
+ if (blade_handle_subscriber_remove(bh, NULL, event, protocol, realm, id)) {
+ blade_protocol_subscribe_raw(bh, event, protocol, realm, KS_TRUE, NULL, NULL);
+ }
ks_pool_free(bh->pool, &event);
ks_pool_free(bh->pool, &protocol);
ks_pool_free(bh->pool, &realm);
// blade.subscribe request generator
-KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data)
+KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data)
{
ks_status_t ret = KS_STATUS_SUCCESS;
blade_session_t *bs = NULL;
+ ks_bool_t propagate = KS_FALSE;
+ blade_subscription_t *bsub = NULL;
ks_assert(bh);
ks_assert(event);
ret = KS_STATUS_DISCONNECTED;
goto done;
}
-
+
if (remove) {
- blade_handle_subscriber_remove(bh, event, protocol, realm, bh->local_nodeid);
+ propagate = blade_handle_subscriber_remove(bh, &bsub, event, protocol, realm, bh->local_nodeid);
} else {
- blade_handle_subscriber_add(bh, event, protocol, realm, bh->local_nodeid);
+ propagate = blade_handle_subscriber_add(bh, &bsub, event, protocol, realm, bh->local_nodeid);
+ ks_assert(event_callback);
}
+ if (bsub) {
+ blade_subscription_callback_set(bsub, event_callback);
+ blade_subscription_callback_data_set(bsub, event_data);
+ }
+
+ if (propagate) ret = blade_protocol_subscribe_raw(bh, event, protocol, realm, remove, callback, data);
done:
if (bs) blade_session_read_unlock(bs);
ks_bool_t remove = KS_FALSE;
cJSON *res = NULL;
cJSON *res_result = NULL;
+ ks_bool_t propagate = KS_FALSE;
ks_assert(brpcreq);
ks_log(KS_LOG_DEBUG, "Session (%s) subscribe request processing\n", blade_session_id_get(bs));
if (remove) {
- blade_handle_subscriber_remove(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+ propagate = blade_handle_subscriber_remove(bh, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
} else {
- blade_handle_subscriber_add(bh, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
+ propagate = blade_handle_subscriber_add(bh, NULL, req_params_event, req_params_protocol, req_params_realm, blade_session_id_get(bs));
}
+ if (propagate) blade_protocol_subscribe_raw(bh, req_params_event, req_params_protocol, req_params_realm, remove, NULL, NULL);
+
// build the actual response finally
blade_rpc_response_raw_create(&res, &res_result, blade_rpc_request_messageid_get(brpcreq));
KS_DECLARE(ks_status_t) blade_handle_protocolrpc_unregister(blade_rpc_t *brpc);
KS_DECLARE(blade_rpc_t *) blade_handle_protocolrpc_lookup(blade_handle_t *bh, const char *method, const char *protocol, const char *realm);
-KS_DECLARE(ks_status_t) blade_handle_subscriber_add(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
-KS_DECLARE(ks_status_t) blade_handle_subscriber_remove(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_add(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid);
+KS_DECLARE(ks_bool_t) blade_handle_subscriber_remove(blade_handle_t *bh, blade_subscription_t **bsubP, const char *event, const char *protocol, const char *realm, const char *nodeid);
KS_DECLARE(ks_status_t) blade_handle_connect(blade_handle_t *bh, blade_connection_t **bcP, blade_identity_t *target, const char *session_id);
KS_DECLARE(cJSON *) blade_protocol_execute_response_result_get(blade_rpc_response_t *brpcres);
KS_DECLARE(void) blade_protocol_execute_response_send(blade_rpc_request_t *brpcreq, cJSON *result);
-KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
+KS_DECLARE(ks_status_t) blade_protocol_subscribe(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data, blade_rpc_request_callback_t event_callback, void *event_data);
KS_DECLARE(ks_status_t) blade_protocol_subscribe_raw(blade_handle_t *bh, const char *event, const char *protocol, const char *realm, ks_bool_t remove, blade_rpc_response_callback_t callback, void *data);
KS_END_EXTERN_C