]> git.ipfire.org Git - thirdparty/asterisk.git/commitdiff
Remove dispatch object allocation from Stasis publishing
authorDavid M. Lee <dlee@digium.com>
Mon, 30 Sep 2013 18:48:57 +0000 (18:48 +0000)
committerDavid M. Lee <dlee@digium.com>
Mon, 30 Sep 2013 18:48:57 +0000 (18:48 +0000)
While looking for areas for performance improvement, I realized that an
unused feature in Stasis was negatively impacting performance.

When a message is sent to a subscriber, a dispatch object is allocated
for the dispatch, containing the topic the message was published to, the
subscriber the message is being sent to, and the message itself.

The topic is actually unused by any subscriber in Asterisk today. And
the subscriber is associated with the taskprocessor the message is being
dispatched to.

First, this patch removes the unused topic parameter from Stasis
subscription callbacks.

Second, this patch introduces the concept of taskprocessor local data,
data that may be set on a taskprocessor and provided along with the data
pointer when a task is pushed using the ast_taskprocessor_push_local()
call. This allows the task to have both data specific to that
taskprocessor, in addition to data specific to that invocation.

With those two changes, the dispatch object can be removed completely,
and the message is simply refcounted and sent directly to the
taskprocessor.

Review: https://reviewboard.asterisk.org/r/2884/

git-svn-id: https://origsvn.digium.com/svn/asterisk/branches/12@400181 65c4cc65-6c06-0410-ace0-fbb531ad65f3

46 files changed:
apps/app_meetme.c
apps/app_queue.c
apps/app_voicemail.c
apps/confbridge/confbridge_manager.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/sig_pri.c
funcs/func_presencestate.c
include/asterisk/stasis.h
include/asterisk/stasis_internal.h
include/asterisk/taskprocessor.h
main/ccss.c
main/cdr.c
main/cel.c
main/devicestate.c
main/endpoints.c
main/manager.c
main/manager_bridges.c
main/manager_channels.c
main/manager_endpoints.c
main/manager_mwi.c
main/pbx.c
main/sounds_index.c
main/stasis.c
main/stasis_cache.c
main/stasis_message_router.c
main/stasis_wait.c
main/taskprocessor.c
res/parking/parking_applications.c
res/parking/parking_bridge_features.c
res/parking/parking_manager.c
res/res_agi.c
res/res_chan_stats.c
res/res_jabber.c
res/res_pjsip_mwi.c
res/res_pjsip_refer.c
res/res_security_log.c
res/res_stasis_test.c
res/res_xmpp.c
res/stasis/app.c
tests/test_devicestate.c
tests/test_stasis.c
tests/test_taskprocessor.c

index e1fedb47bfc4e458528444e07f4f41a4753bf812..cd6a4f72c66aea44bf59804b48a0004e18d0ace0 100644 (file)
@@ -1139,7 +1139,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talking_type);
 STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talk_request_type);
 
 static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message);
+       struct stasis_message *message);
 
 static void meetme_stasis_cleanup(void)
 {
@@ -1226,7 +1226,7 @@ static int meetme_stasis_init(void)
 }
 
 static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *channel_blob = stasis_message_data(message);
        struct stasis_message_type *message_type;
index 50c7f526a4435510628605235c5e349cb6f09af4..e72997fa8237c7a6bfdf0a9ac7e8a349b323872e 100644 (file)
@@ -1832,7 +1832,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_dump_type);
 STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type);
 
 static void queue_channel_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
@@ -1858,7 +1858,7 @@ static void queue_channel_manager_event(void *data,
 }
 
 static void queue_multi_channel_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
@@ -1902,7 +1902,7 @@ static void queue_multi_channel_manager_event(void *data,
 }
 
 static void queue_member_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
@@ -2140,7 +2140,7 @@ static int is_member_available(struct call_queue *q, struct member *mem)
 }
 
 /*! \brief set a member's status based on device state of that member's interface*/
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ao2_iterator miter, qiter;
        struct ast_device_state_message *dev_state;
@@ -5185,7 +5185,7 @@ static void send_agent_complete(const char *queuename, struct ast_channel_snapsh
 }
 
 static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct ast_channel_blob *agent_blob;
 
@@ -5401,7 +5401,7 @@ static void log_attended_transfer(struct queue_stasis_data *queue_data, struct a
  * \param msg The stasis message for the bridge enter event
  */
 static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
@@ -5434,7 +5434,7 @@ static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
  * \param msg The stasis message for the blind transfer event
  */
 static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
@@ -5503,7 +5503,7 @@ static void handle_blind_transfer(void *userdata, struct stasis_subscription *su
  * \param msg The stasis message for the attended transfer event.
  */
 static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
@@ -5558,7 +5558,7 @@ static void handle_attended_transfer(void *userdata, struct stasis_subscription
  * subroutines for further processing.
  */
 static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        if (stasis_subscription_final_message(sub, msg)) {
                ao2_cleanup(userdata);
@@ -5578,7 +5578,7 @@ static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
  * \param msg The stasis message for the local optimization begin event
  */
 static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
@@ -5630,7 +5630,7 @@ static void handle_local_optimization_begin(void *userdata, struct stasis_subscr
  * \param msg The stasis message for the local optimization end event
  */
 static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
@@ -5695,7 +5695,7 @@ static void handle_local_optimization_end(void *userdata, struct stasis_subscrip
  * \param msg The stasis message for the hangup event.
  */
 static void handle_hangup(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct queue_stasis_data *queue_data = userdata;
        struct ast_channel_blob *channel_blob = stasis_message_data(msg);
@@ -5756,7 +5756,7 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub,
  * subroutines for further processing.
  */
 static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        if (stasis_subscription_final_message(sub, msg)) {
                ao2_cleanup(userdata);
index c32b78184489c90ef6b53a7f059d17f1ef1f7969..1ab164511ad217787011398283ea4169f98e7a19 100644 (file)
@@ -12606,7 +12606,7 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change)
        }
 }
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct stasis_subscription_change *change;
        /* Only looking for subscription change notices here */
@@ -12629,7 +12629,7 @@ static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct
 static int dump_cache(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
-       mwi_event_cb(NULL, NULL, NULL, msg);
+       mwi_event_cb(NULL, NULL, msg);
        return 0;
 }
 
index 57bf64b7c80346ce95d008a33be70f5e14421215..1b8eab24b830e2fd6e62850ab0c2b6c60c77534b 100644 (file)
@@ -224,63 +224,54 @@ static void confbridge_publish_manager_event(
 }
 
 static void confbridge_start_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeStart", NULL);
 }
 
 static void confbridge_end_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeEnd", NULL);
 }
 
 static void confbridge_leave_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeLeave", NULL);
 }
 
 static void confbridge_join_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeJoin", NULL);
 }
 
 static void confbridge_start_record_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeRecord", NULL);
 }
 
 static void confbridge_stop_record_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeStopRecord", NULL);
 }
 
 static void confbridge_mute_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeMute", NULL);
 }
 
 static void confbridge_unmute_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        confbridge_publish_manager_event(message, "ConfbridgeUnmute", NULL);
 }
 
 static void confbridge_talking_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, extra_text, NULL, ast_free);
index 5fc96836757bf22a527049f714dab2e5d5ec6010..f7cfa8cd76222d8caa833a3c418d0da0715658cd 100644 (file)
@@ -553,7 +553,7 @@ static int restart_monitor(void);
 
 static int dahdi_sendtext(struct ast_channel *c, const char *text);
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* This module does not handle MWI in an event-based manner.  However, it
         * subscribes to MWI for each mailbox that is configured so that the core
index fb68131b608acefb5f555bc2d27e240bcb98e140..b880f5d9480305f5fcbbf718906c3cea898268c2 100644 (file)
@@ -1270,8 +1270,8 @@ static void build_rand_pad(unsigned char *buf, ssize_t len);
 static int get_unused_callno(enum callno_type type, int validated, callno_entry *entry);
 static int replace_callno(const void *obj);
 static void sched_delay_remove(struct sockaddr_in *sin, callno_entry entry);
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 
 static struct ast_channel_tech iax2_tech = {
        .type = "IAX2",
@@ -1331,7 +1331,7 @@ static void iax2_lock_owner(int callno)
        }
 }
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* The MWI subscriptions exist just so the core knows we care about those
         * mailboxes.  However, we just grab the events out of the cache when it
@@ -1378,7 +1378,7 @@ static int network_change_sched_cb(const void *data)
 }
 
 static void network_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* This callback is only concerned with network change messages from the system topic. */
        if (stasis_message_type(message) != ast_network_change_type()) {
@@ -1392,7 +1392,7 @@ static void network_change_stasis_cb(void *data, struct stasis_subscription *sub
 }
 
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_message_type(message) != ast_named_acl_change_type()) {
                return;
index b28fce3559c12e66c191f86fdb957ca5b7bb2a0d..8fcdebfad704dff61262a62644259a59f0a726d3 100644 (file)
@@ -486,7 +486,7 @@ static struct ast_channel_tech mgcp_tech = {
        .func_channel_read = acf_channel_read,
 };
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* This module does not handle MWI in an event-based manner.  However, it
         * subscribes to MWI for each mailbox that is configured so that the core
index 9cbfd715d87edb22cb36d44bfa20990e607b0af1..2a821dd73f24920fd52d1482e52cce87fdd3e387 100644 (file)
@@ -1324,9 +1324,9 @@ static int sip_poke_noanswer(const void *data);
 static int sip_poke_peer(struct sip_peer *peer, int force);
 static void sip_poke_all_peers(void);
 static void sip_peer_hold(struct sip_pvt *p, int hold);
-static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *);
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_message *);
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 static void sip_keepalive_all_peers(void);
 
 /*--- Applications, functions, CLI and manager command helpers */
@@ -16825,7 +16825,7 @@ static void sip_peer_hold(struct sip_pvt *p, int hold)
 }
 
 /*! \brief Receive MWI events that we have subscribed to */
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct sip_peer *peer = userdata;
        if (stasis_subscription_final_message(sub, msg)) {
@@ -16872,7 +16872,7 @@ static int network_change_sched_cb(const void *data)
        return 0;
 }
 
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        /* This callback is only concerned with network change messages from the system topic. */
        if (stasis_message_type(message) != ast_network_change_type()) {
@@ -28940,7 +28940,7 @@ static int restart_monitor(void)
 }
 
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_message_type(message) != ast_named_acl_change_type()) {
                return;
index b6207961627b77a3048faa1f4b8fa9061dc78331..7cf592a261f9f7641a3e131eb249dc776b94b695 100644 (file)
@@ -1639,7 +1639,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s
 static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan);
 static int skinny_senddigit_begin(struct ast_channel *ast, char digit);
 static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration);
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
 static int skinny_dialer_cb(const void *data);
 static int skinny_reload(void);
 
@@ -2300,7 +2300,7 @@ static int skinny_register(struct skinny_req *req, struct skinnysession *s)
                                set_callforwards(l, NULL, SKINNY_CFWD_ALL|SKINNY_CFWD_BUSY|SKINNY_CFWD_NOANSWER);
                                register_exten(l);
                                /* initialize MWI on line and device */
-                               mwi_event_cb(l, NULL, NULL, NULL);
+                               mwi_event_cb(l, NULL, NULL);
                                AST_LIST_TRAVERSE(&l->sublines, subline, list) {
                                        ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container);
                                }
@@ -3529,7 +3529,7 @@ static void update_connectedline(struct skinny_subchannel *sub, const void *data
        send_callinfo(sub);
 }
 
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct skinny_line *l = userdata;
        struct skinny_device *d = l->device;
index a6d134e1b384d149fbc7043f11cc80cdb15036f1..9f400772375d313a5256e9bccb33d4aff478171b 100644 (file)
@@ -8892,7 +8892,7 @@ static void sig_pri_send_mwi_indication(struct sig_pri_span *pri, const char *vm
  *
  * \return Nothing
  */
-static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct sig_pri_span *pri = userdata;
        const char *mbox_context;
index 75cef8a549b91a4dee29626c73695999f64d9885..49f8e78a92055cea7fe4a1feae2bb2be60a9f779 100644 (file)
@@ -649,7 +649,7 @@ struct test_cb_data {
        sem_t sem;
 };
 
-static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct test_cb_data *cb_data = userdata;
        if (stasis_message_type(msg) != ast_presence_state_message_type()) {
index 70bb973faf68ae971716bcaf428b191d3f34f5b5..529aa12bbdd1715bc993fc1d8584f470bf85ece2 100644 (file)
@@ -347,18 +347,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
  */
 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
 
-/*!
- * \brief Publish a message from a specified topic to all the subscribers of a
- * possibly different topic.
- * \param topic Topic to publish message to.
- * \param topic Original topic message was from.
- * \param message Message
- * \since 12
- */
-void stasis_forward_message(struct stasis_topic *topic,
-                           struct stasis_topic *publisher_topic,
-                           struct stasis_message *message);
-
 /*!
  * \brief Wait for all pending messages on a given topic to be processed.
  * \param topic Topic to await pending messages on.
@@ -381,11 +369,10 @@ struct stasis_subscription;
 /*!
  * \brief Callback function type for Stasis subscriptions.
  * \param data Data field provided with subscription.
- * \param topic Topic to which the message was published.
  * \param message Published message.
  * \since 12
  */
-typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 
 /*!
  * \brief Create a subscription.
@@ -583,8 +570,6 @@ struct stasis_message_type *stasis_cache_update_type(void);
  * \since 12
  */
 struct stasis_cache_update {
-       /*! \brief Topic that published \c new_snapshot */
-       struct stasis_topic *topic;
        /*! \brief Convenience reference to snapshot type */
        struct stasis_message_type *type;
        /*! \brief Old value from the cache */
index 67ab88ff04d1d8a9e73abf03688c7d5ee8cf91b4..01e5812422d8e2acb56d62ef8992277f14474148 100644 (file)
@@ -62,7 +62,7 @@ struct stasis_message;
  */
 struct stasis_subscription *internal_stasis_subscribe(
        struct stasis_topic *topic,
-       void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message),
+       void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message),
        void *data,
        int needs_mailbox);
 
index 5cb574854f792292e4591b671b5975dd8ae284c9..ab523290c8f7f8053cf0da3b3507836b8839ddfd 100644 (file)
@@ -175,6 +175,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
  */
 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
 
+/*!
+ * \brief Sets the local data associated with a taskprocessor.
+ *
+ * \since 12.0.0
+ *
+ * See ast_taskprocessor_push_local().
+ *
+ * \param tps Task processor.
+ * \param local_data Local data to associate with \a tps.
+ */
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data);
+
 /*!
  * \brief Unreference the specified taskprocessor and its reference count will decrement.
  *
@@ -197,6 +209,32 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
  */
 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
 
+/*! \brief Local data parameter */
+struct ast_taskprocessor_local {
+       /*! Local data, associated with the taskprocessor. */
+       void *local_data;
+       /*! Data pointer passed with this task. */
+       void *data;
+};
+
+/*!
+ * \brief Push a task into the specified taskprocessor queue and signal the
+ * taskprocessor thread.
+ *
+ * The callback receives a \ref ast_taskprocessor_local struct, which contains
+ * both the provided \a datap pointer, and any local data set on the
+ * taskprocessor with ast_taskprocessor_set_local().
+ *
+ * \param tps The taskprocessor structure
+ * \param task_exe The task handling function to push into the taskprocessor queue
+ * \param datap The data to be used by the task handling function
+ * \retval 0 success
+ * \retval -1 failure
+ * \since 12.0.0
+ */
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
+       int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+
 /*!
  * \brief Pop a task off the taskprocessor and execute it.
  *
index 061c45a7ca269a41ece5d5e91b8a0e1ffa20fc44..3068c6ffac9f5e09d9097737f112e4471c6f400c 100644 (file)
@@ -1397,7 +1397,7 @@ static void generic_monitor_instance_list_destructor(void *obj)
        ast_free((char *)generic_list->device_name);
 }
 
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
 static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
 {
        struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
@@ -1471,7 +1471,7 @@ static int generic_monitor_devstate_tp_cb(void *data)
        return 0;
 }
 
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        /* Wow, it's cool that we've picked up on a state change, but we really want
         * the actual work to be done in the core's taskprocessor execution thread
@@ -2750,7 +2750,7 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent)
        return 0;
 }
 
-static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_cc_agent *agent = userdata;
        enum ast_device_state new_state;
index f7af298651936c17dd801db0dbbf235d21569fd9..ea0f9c0e4b9ed8771c264d376efa857a66264a84 100644 (file)
@@ -1839,7 +1839,7 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch
  * \param topic The topic this message was published for
  * \param message The message
  */
-static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
        RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
@@ -2020,7 +2020,7 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
  * \param topic The topic this message was published for
  * \param message The message
  */
-static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
        RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@@ -2150,7 +2150,7 @@ static int filter_bridge_messages(struct ast_bridge_snapshot *bridge)
  * \param message The message - hopefully a bridge one!
  */
 static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_bridge_blob *update = stasis_message_data(message);
        struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2450,7 +2450,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
  * \param message The message - hopefully a bridge one!
  */
 static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_bridge_blob *update = stasis_message_data(message);
        struct ast_bridge_snapshot *bridge = update->bridge;
@@ -2494,7 +2494,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
  * \param message The message about who got parked
  * */
 static void handle_parked_call_message(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_parked_call_payload *payload = stasis_message_data(message);
        struct ast_channel_snapshot *channel = payload->parkee;
index 36daf2066daf904f2d456944e9829c3bf850ed44..0d78b5ccebe9a3f85790d765ff31bd1ecd289737 100644 (file)
@@ -1019,7 +1019,6 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
 }
 
 static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct stasis_cache_update *update = stasis_message_data(message);
@@ -1082,7 +1081,6 @@ static struct ast_str *cel_generate_peer_str(
 
 static void cel_bridge_enter_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1110,7 +1108,6 @@ static void cel_bridge_enter_cb(
 
 static void cel_bridge_leave_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_bridge_blob *blob = stasis_message_data(message);
@@ -1138,7 +1135,6 @@ static void cel_bridge_leave_cb(
 
 static void cel_parking_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
@@ -1183,7 +1179,6 @@ static void save_dialstatus(struct ast_multi_channel_blob *blob)
 }
 
 static void cel_dial_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_multi_channel_blob *blob = stasis_message_data(message);
@@ -1218,7 +1213,6 @@ static void cel_dial_cb(void *data, struct stasis_subscription *sub,
 
 static void cel_generic_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
@@ -1241,7 +1235,6 @@ static void cel_generic_cb(
 
 static void cel_blind_transfer_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_bridge_blob *obj = stasis_message_data(message);
@@ -1289,7 +1282,6 @@ static void cel_blind_transfer_cb(
 
 static void cel_attended_transfer_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_attended_transfer_message *xfer = stasis_message_data(message);
@@ -1342,7 +1334,6 @@ static void cel_attended_transfer_cb(
 
 static void cel_pickup_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
@@ -1364,7 +1355,6 @@ static void cel_pickup_cb(
 
 static void cel_local_cb(
        void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
index bcf07ff0b2d57ab58b75409c97dee006420ebcf0..158d1f817e8302108ddca561efba071a5be9e758 100644 (file)
@@ -610,7 +610,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre
        return 1;
 }
 
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        enum ast_device_state aggregate_state;
        char *device;
index b33e33f1a8fab81c5401c1881f560720cb04dde8..bdcf401ba03ce7ea4df3f695e669246621017eff 100644 (file)
@@ -152,7 +152,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
 
 /*! \brief Handler for channel snapshot cache clears */
 static void endpoint_cache_clear(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        struct ast_endpoint *endpoint = data;
@@ -174,7 +174,7 @@ static void endpoint_cache_clear(void *data,
 }
 
 static void endpoint_default(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        struct stasis_endpoint *endpoint = data;
index 75e20c21d5e2925addcebe9998c6b20aa52ee62a..69def4b1fba0887cbeb2e48d225e514f93ade668 100644 (file)
@@ -1151,7 +1151,7 @@ static const struct {
        {{ "restart", "gracefully", NULL }},
 };
 
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
 
 static void acl_change_stasis_subscribe(void)
 {
@@ -1427,7 +1427,6 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
 }
 
 static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
@@ -1444,7 +1443,6 @@ static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_json_payload *payload = stasis_message_data(message);
@@ -7640,7 +7638,6 @@ static void load_channelvars(struct ast_variable *var)
 #ifdef TEST_FRAMEWORK
 
 static void test_suite_event_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic,
                struct stasis_message *message)
 {
        struct ast_test_suite_message_payload *payload;
@@ -8344,7 +8341,7 @@ static int __init_manager(int reload, int by_external_config)
 }
 
 static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_message_type(message) != ast_named_acl_change_type()) {
                return;
index 38f9af4771bc5fa6905e61daf8deff0b47d4839c..fad676b567ca9ec9a120329c7eb9045fce63623f 100644 (file)
@@ -180,7 +180,6 @@ bridge_snapshot_monitor bridge_monitors[] = {
 };
 
 static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
@@ -221,7 +220,6 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
 }
 
 static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_bridge_merge_message *merge_msg = stasis_message_data(message);
@@ -254,7 +252,6 @@ static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_enter_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        static const char *swap_name = "SwapUniqueid: ";
@@ -283,7 +280,6 @@ static void channel_enter_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_leave_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_bridge_blob *blob = stasis_message_data(message);
index d39687ffd04d13b745f39e29a7b74f2ab9f7d1fb..0bebb216c0d8ebc53ec575a590780f9e263eb2e5 100644 (file)
@@ -565,7 +565,6 @@ channel_snapshot_monitor channel_monitors[] = {
 };
 
 static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -616,7 +615,7 @@ static int userevent_exclusion_cb(const char *key)
 }
 
 static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -667,7 +666,7 @@ static void publish_basic_channel_event(const char *event, int class, struct ast
 }
 
 static void channel_hangup_request_cb(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
@@ -707,7 +706,7 @@ static void channel_hangup_request_cb(void *data,
 }
 
 static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
        struct ast_channel_snapshot *spyer;
@@ -730,7 +729,7 @@ static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
        RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
@@ -765,7 +764,7 @@ static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub
 }
 
 static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -806,7 +805,7 @@ static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@@ -853,7 +852,7 @@ static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
        struct ast_channel_blob *payload = stasis_message_data(message);
@@ -884,7 +883,7 @@ static void channel_hangup_handler_cb(void *data, struct stasis_subscription *su
 }
 
 static void channel_fax_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
        RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
@@ -957,7 +956,7 @@ static void channel_fax_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
        struct ast_json *blob = payload->blob;
@@ -977,7 +976,7 @@ static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
 
@@ -985,7 +984,7 @@ static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
 
@@ -993,7 +992,7 @@ static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub
 }
 
 static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct ast_channel_blob *payload = stasis_message_data(message);
 
@@ -1004,7 +1003,7 @@ static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
  * \brief Callback processing messages for channel dialing
  */
 static void channel_dial_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_multi_channel_blob *obj = stasis_message_data(message);
        const char *dialstatus;
@@ -1051,7 +1050,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_hold_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        const char *musicclass;
@@ -1083,7 +1082,7 @@ static void channel_hold_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_channel_blob *obj = stasis_message_data(message);
        RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
index 634283728a3bcbf8bfe272d512727955a5c6e736..b5f5b31c282f61266ca2c16f7eee222ff6ee256f 100644 (file)
@@ -46,14 +46,9 @@ static void manager_endpoints_shutdown(void)
 }
 
 static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic,
        struct stasis_message *message)
 {
-       /* XXX This looks wrong. Nothing should post or forward to a caching
-        * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
-        * to dig to make sure I don't break anything, though.
-        */
-       stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
+       stasis_publish(ast_manager_get_topic(), message);
 }
 
 int manager_endpoints_init(void)
index 9847bd4a73a160d18e8a034bc73313fed39beaa6..849c315e130bf13a20314183d87144073e890525 100644 (file)
@@ -54,7 +54,6 @@ static int exclude_event_cb(const char *key)
 
 /*! \brief Generic MWI event callback used for one-off events from voicemail modules */
 static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_mwi_blob *payload = stasis_message_data(message);
@@ -86,7 +85,6 @@ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
 }
 
 static void mwi_update_cb(void *data, struct stasis_subscription *sub,
-                                   struct stasis_topic *topic,
                                    struct stasis_message *message)
 {
        struct ast_mwi_state *mwi_state;
index 09f3d95ec7157553c6be9f3d3ccdecb69016e2ee..2a415401f16a6f3fdf3a001ab6a7568834a19216 100644 (file)
@@ -5111,7 +5111,7 @@ static void get_device_state_causing_channels(struct ao2_container *c)
        ao2_iterator_destroy(&iter);
 }
 
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_device_state_message *dev_state;
        struct ast_hint *hint;
@@ -11369,7 +11369,7 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data)
        return res;
 }
 
-static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_presence_state_message *presence_state = stasis_message_data(msg);
        struct ast_hint *hint;
index 9f70ef6cc0159513ad267c3751b7073d200cc619..2fcd23908dcec76cd542b62536e595794fd0bbc5 100644 (file)
@@ -281,7 +281,7 @@ static void sounds_cleanup(void)
 }
 
 static void format_update_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        ast_sounds_reindex();
 }
index 807ba43441c322bcce73ef2c1190a5c12d8d6e26..42c90176996c3a50a8bb1291da385536ab3c5512 100644 (file)
@@ -249,7 +249,6 @@ static void subscription_dtor(void *obj)
  * \param message Message to send.
  */
 static void subscription_invoke(struct stasis_subscription *sub,
-                                 struct stasis_topic *topic,
                                  struct stasis_message *message)
 {
        /* Notify that the final message has been received */
@@ -260,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub,
        }
 
        /* Since sub is mostly immutable, no need to lock sub */
-       sub->callback(sub->data, sub, topic, message);
+       sub->callback(sub->data, sub, message);
 
        /* Notify that the final message has been processed */
        if (stasis_subscription_final_message(sub, message)) {
@@ -301,6 +300,9 @@ struct stasis_subscription *internal_stasis_subscribe(
                if (!sub->mailbox) {
                        return NULL;
                }
+               ast_taskprocessor_set_local(sub->mailbox, sub);
+               /* Taskprocessor has a reference */
+               ao2_ref(sub, +1);
        }
 
        ao2_ref(topic, +1);
@@ -327,6 +329,13 @@ struct stasis_subscription *stasis_subscribe(
        return internal_stasis_subscribe(topic, callback, data, 1);
 }
 
+static int sub_cleanup(void *data)
+{
+       struct stasis_subscription *sub = data;
+       ao2_cleanup(sub);
+       return 0;
+}
+
 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 {
        /* The subscription may be the last ref to this topic. Hold
@@ -349,6 +358,11 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
        /* Now let everyone know about the unsubscribe */
        send_subscription_unsubscribe(topic, sub);
 
+       /* When all that's done, remove the ref the mailbox has on the sub */
+       if (sub->mailbox) {
+               ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+       }
+
        /* Unsubscribing unrefs the subscription */
        ao2_cleanup(sub);
        return NULL;
@@ -475,93 +489,39 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
        return ast_vector_remove_elem_unordered(topic->subscribers, sub);
 }
 
-/*!
- * \internal
- * \brief Information needed to dispatch a message to a subscription
- */
-struct dispatch {
-       /*! Topic message was published to */
-       struct stasis_topic *topic;
-       /*! The message itself */
-       struct stasis_message *message;
-       /*! Subscription receiving the message */
-       struct stasis_subscription *sub;
-};
-
-static void dispatch_dtor(struct dispatch *dispatch)
-{
-       ao2_cleanup(dispatch->topic);
-       ao2_cleanup(dispatch->message);
-       ao2_cleanup(dispatch->sub);
-
-       ast_free(dispatch);
-}
-
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
-{
-       struct dispatch *dispatch;
-
-       ast_assert(topic != NULL);
-       ast_assert(message != NULL);
-       ast_assert(sub != NULL);
-
-       dispatch = ast_malloc(sizeof(*dispatch));
-       if (!dispatch) {
-               return NULL;
-       }
-
-       dispatch->topic = topic;
-       ao2_ref(topic, +1);
-
-       dispatch->message = message;
-       ao2_ref(message, +1);
-
-       dispatch->sub = sub;
-       ao2_ref(sub, +1);
-
-       return dispatch;
-}
-
 /*!
  * \brief Dispatch a message to a subscriber
  * \param data \ref dispatch object
  * \return 0
  */
-static int dispatch_exec(void *data)
+static int dispatch_exec(struct ast_taskprocessor_local *local)
 {
-       struct dispatch *dispatch = data;
+       struct stasis_subscription *sub = local->local_data;
+       struct stasis_message *message = local->data;
 
-       subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
-       dispatch_dtor(dispatch);
+       subscription_invoke(sub, message);
+       ao2_cleanup(message);
 
        return 0;
 }
 
 static void dispatch_message(struct stasis_subscription *sub,
-       struct stasis_topic *publisher_topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (sub->mailbox) {
-               struct dispatch *dispatch;
-
-               dispatch = dispatch_create(publisher_topic, message, sub);
-               if (!dispatch) {
+               ao2_bump(message);
+               if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+                       /* Push failed; ugh. */
                        ast_log(LOG_DEBUG, "Dropping dispatch\n");
-                       return;
-               }
-
-               if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
-                       /* Push failed; just delete the dispatch.
-                        */
-                       ast_log(LOG_DEBUG, "Dropping dispatch\n");
-                       dispatch_dtor(dispatch);
+                       ao2_cleanup(message);
                }
        } else {
                /* Dispatch directly */
-               subscription_invoke(sub, publisher_topic, message);
+               subscription_invoke(sub, message);
        }
 }
 
-void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
 {
        size_t i;
        /* The topic may be unref'ed by the subscription invocation.
@@ -571,23 +531,18 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
        SCOPED_AO2LOCK(lock, topic);
 
        ast_assert(topic != NULL);
-       ast_assert(publisher_topic != NULL);
        ast_assert(message != NULL);
 
        for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
-               struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
+               struct stasis_subscription *sub =
+                       ast_vector_get(topic->subscribers, i);
 
                ast_assert(sub != NULL);
 
-               dispatch_message(sub, publisher_topic, message);
+               dispatch_message(sub, message);
        }
 }
 
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
-{
-       stasis_forward_message(topic, topic, message);
-}
-
 /*!
  * \brief Forwarding information
  *
@@ -748,7 +703,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
        stasis_publish(topic, msg);
 
        /* Now we have to dispatch to the subscription itself */
-       dispatch_message(sub, topic, msg);
+       dispatch_message(sub, msg);
 }
 
 struct topic_pool_entry {
index d4375520da067bfc96cb67cbd3905e862a5280fb..279210d5b660dd670957bda4889da079624d82af 100644 (file)
@@ -339,8 +339,6 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_messa
 static void stasis_cache_update_dtor(void *obj)
 {
        struct stasis_cache_update *update = obj;
-       ao2_cleanup(update->topic);
-       update->topic = NULL;
        ao2_cleanup(update->old_snapshot);
        update->old_snapshot = NULL;
        ao2_cleanup(update->new_snapshot);
@@ -349,12 +347,11 @@ static void stasis_cache_update_dtor(void *obj)
        update->type = NULL;
 }
 
-static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
 {
        RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
 
-       ast_assert(topic != NULL);
        ast_assert(old_snapshot != NULL || new_snapshot != NULL);
 
        update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
@@ -363,8 +360,6 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
                return NULL;
        }
 
-       ao2_ref(topic, +1);
-       update->topic = topic;
        if (old_snapshot) {
                ao2_ref(old_snapshot, +1);
                update->old_snapshot = old_snapshot;
@@ -390,7 +385,7 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
 }
 
 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
        struct stasis_caching_topic *caching_topic = data;
@@ -418,7 +413,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                if (clear_id) {
                        old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
                        if (old_snapshot) {
-                               update = update_create(topic, old_snapshot, NULL);
+                               update = update_create(old_snapshot, NULL);
                                stasis_publish(caching_topic->topic, update);
                                return;
                        }
@@ -440,7 +435,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
 
                old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
 
-               update = update_create(topic, old_snapshot, message);
+               update = update_create(old_snapshot, message);
                if (update == NULL) {
                        return;
                }
index 864cf42c4791f87cf0514556962835bcc8562c46..8c82decfef817551fcd22ce19b1fccbbc559d580 100644 (file)
@@ -184,14 +184,13 @@ static int find_route(
 
 static void router_dispatch(void *data,
                            struct stasis_subscription *sub,
-                           struct stasis_topic *topic,
                            struct stasis_message *message)
 {
        struct stasis_message_router *router = data;
        struct stasis_message_route route;
 
        if (find_route(router, message, &route) == 0) {
-               route.callback(route.data, sub, topic, message);
+               route.callback(route.data, sub, message);
        }
 
        if (stasis_subscription_final_message(sub, message)) {
index e94c686e1b729d520bf400b61d51cd0d12dac23e..32b59718c108d7fc7257c7457a34fd33df854e4f 100644 (file)
@@ -55,7 +55,7 @@ static void caching_guarantee_dtor(void *obj)
 }
 
 static void guarantee_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* Wait for our particular message */
        if (data == message) {
index ebb6200c372aefdaf11be7498472f1933f4bb004..189219d668450677b87d4cc9a2084b62616baa4d 100644 (file)
@@ -48,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  */
 struct tps_task {
        /*! \brief The execute() task callback function pointer */
-       int (*execute)(void *datap);
+       union {
+               int (*execute)(void *datap);
+               int (*execute_local)(struct ast_taskprocessor_local *local);
+       } callback;
        /*! \brief The data pointer for the task execute() function */
        void *datap;
        /*! \brief AST_LIST_ENTRY overhead */
        AST_LIST_ENTRY(tps_task) list;
+       unsigned int wants_local:1;
 };
 
 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@@ -69,6 +73,7 @@ struct ast_taskprocessor {
        const char *name;
        /*! \brief Taskprocessor statistics */
        struct tps_taskprocessor_stats *stats;
+       void *local_data;
        /*! \brief Taskprocessor current queue size */
        long tps_queue_size;
        /*! \brief Taskprocessor queue */
@@ -282,10 +287,41 @@ int ast_tps_init(void)
 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
 {
        struct tps_task *t;
-       if ((t = ast_calloc(1, sizeof(*t)))) {
-               t->execute = task_exe;
-               t->datap = datap;
+       if (!task_exe) {
+               ast_log(LOG_ERROR, "task_exe is NULL!\n");
+               return NULL;
+       }
+
+       t = ast_calloc(1, sizeof(*t));
+       if (!t) {
+               ast_log(LOG_ERROR, "failed to allocate task!\n");
+               return NULL;
+       }
+
+       t->callback.execute = task_exe;
+       t->datap = datap;
+
+       return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+       struct tps_task *t;
+       if (!task_exe) {
+               ast_log(LOG_ERROR, "task_exe is NULL!\n");
+               return NULL;
+       }
+
+       t = ast_calloc(1, sizeof(*t));
+       if (!t) {
+               ast_log(LOG_ERROR, "failed to allocate task!\n");
+               return NULL;
        }
+
+       t->callback.execute_local = task_exe;
+       t->datap = datap;
+       t->wants_local = 1;
+
        return t;
 }
 
@@ -643,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
        return __allocate_taskprocessor(name, listener);
 }
 
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+       void *local_data)
+{
+       SCOPED_AO2LOCK(lock, tps);
+       tps->local_data = local_data;
+}
+
 /* decrement the taskprocessor reference count and unlink from the container if necessary */
 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 {
@@ -664,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
 }
 
 /* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
 {
-       struct tps_task *t;
        int previous_size;
        int was_empty;
 
-       if (!tps || !task_exe) {
-               ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+       if (!tps) {
+               ast_log(LOG_ERROR, "tps is NULL!\n");
                return -1;
        }
-       if (!(t = tps_task_alloc(task_exe, datap))) {
-               ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
+
+       if (!t) {
+               ast_log(LOG_ERROR, "t is NULL!\n");
                return -1;
        }
+
        ao2_lock(tps);
        AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
        previous_size = tps->tps_queue_size++;
@@ -688,8 +732,19 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
        return 0;
 }
 
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+       return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+       return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
 {
+       struct ast_taskprocessor_local local;
        struct tps_task *t;
        int size;
 
@@ -701,9 +756,18 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
        }
 
        tps->executing = 1;
+
+       if (t->wants_local) {
+               local.local_data = tps->local_data;
+               local.data = t->datap;
+       }
        ao2_unlock(tps);
 
-       t->execute(t->datap);
+       if (t->wants_local) {
+               t->callback.execute_local(&local);
+       } else {
+               t->callback.execute(t->datap);
+       }
        tps_task_free(t);
 
        ao2_lock(tps);
index fc74ac2be337b722c05bbfa4957a99147de8f517..7bd89226625c86f432b98dc4c8e216d0e88d9f36 100644 (file)
@@ -739,7 +739,7 @@ announce_cleanup:
        cap_slin = ast_format_cap_destroy(cap_slin);
 }
 
-static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct park_announce_subscription_data *pa_data = data;
        char *dial_string = pa_data->dial_string;
index 0e5e05d8f17351cb2410b8f0c73e39c08580ad35..6c1d4d65e112ab4be69a945fdac3960b99477aaa 100644 (file)
@@ -125,7 +125,7 @@ static void parker_parked_call_message_response(struct ast_parked_call_payload *
        }
 }
 
-static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        if (stasis_subscription_final_message(sub, message)) {
                ast_free(data);
index dac19109091917090044392ab3b6793ae9c62380..0c577018a0c2fc72db13f04d946126f215a70a53 100644 (file)
@@ -545,7 +545,7 @@ static void parked_call_message_response(struct ast_parked_call_payload *parked_
                );
 }
 
-static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        if (stasis_message_type(message) == ast_parked_call_type()) {
                struct ast_parked_call_payload *parked_call_message = stasis_message_data(message);
index 25251f37e093da011b87ed1dba9c3878adb21a62..84dcbebe9c5d3bbcc587f0a878df828183ad324f 100644 (file)
@@ -1040,7 +1040,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_exec_type);
 STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type);
 
 static void agi_channel_manager_event(void *data,
-       struct stasis_subscription *sub, struct stasis_topic *topic,
+       struct stasis_subscription *sub,
        struct stasis_message *message)
 {
        const char *type = data;
index a43c564b1e76092b979300426a9af3775a2be0ab..9d1e8c02e777f7c1b3411805b23b72be598dcee0 100644 (file)
@@ -57,7 +57,7 @@ static struct stasis_message_router *router;
  * \param message The message itself.
  */
 static void statsmaker(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        RAII_VAR(struct ast_str *, metric, NULL, ast_free);
 
@@ -89,7 +89,7 @@ static void statsmaker(void *data, struct stasis_subscription *sub,
  * \param message The message itself.
  */
 static void updates(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* Since this came from a message router, we know the type of the
         * message. We can cast the data without checking its type.
@@ -139,7 +139,7 @@ static void updates(void *data, struct stasis_subscription *sub,
  * \param message The message itself.
  */
 static void default_route(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        if (stasis_subscription_final_message(sub, message)) {
                /* Much like with the regular subscription, you may need to
index b534e803b7daee78498bec2f5b510d83cc190c1a..c1cee9b7aefdf4ea2e2d092cb130b94c6c18c9d8 100644 (file)
@@ -371,8 +371,8 @@ static void aji_pubsub_purge_nodes(struct aji_client *client,
        const char* collection_name);
 static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
        const char *context, const char *oldmsgs, const char *newmsgs);
-static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
-static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
 static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
                                       const char *event_type, unsigned int cachable);
 /* No transports in this version */
@@ -3235,7 +3235,7 @@ int ast_aji_disconnect(struct aji_client *client)
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        const char *mailbox;
        const char *context;
@@ -3269,7 +3269,7 @@ static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasi
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct aji_client *client = data;
        struct ast_device_state_message *dev_state;
@@ -3291,7 +3291,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
        struct aji_client *client = arg;
-       aji_devstate_cb(client, device_state_sub, NULL, msg);
+       aji_devstate_cb(client, device_state_sub, msg);
        return 0;
 }
 
index e2b9b630ddcf34915849a9bd7f379723cc6fb43f..17d648b747e50fe9a6f3698ae8e8318670aa85bd 100644 (file)
@@ -118,7 +118,7 @@ struct mwi_subscription {
 };
 
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg);
+               struct stasis_message *msg);
 
 static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char *mailbox, struct mwi_subscription *mwi_sub)
 {
@@ -603,7 +603,7 @@ static int serialized_cleanup(void *userdata)
 }
 
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *msg)
+               struct stasis_message *msg)
 {
        struct mwi_subscription *mwi_sub = userdata;
 
index cca5a7c396ab66bab2c344a52f5e9acebf22c95f..91da22fde6f79b39f8faa6083299aeaa54b3b3f4 100644 (file)
@@ -143,7 +143,7 @@ static int refer_progress_notify(void *data)
 }
 
 static void refer_progress_bridge(void *data, struct stasis_subscription *sub,
-               struct stasis_topic *topic, struct stasis_message *message)
+               struct stasis_message *message)
 {
        struct refer_progress *progress = data;
        struct ast_bridge_blob *enter_blob;
index d06f9f754975217f91de27d7f319afa52d514c4b..e56f7f76fc09bb7a5f579efeef94b2a5d4d47913 100644 (file)
@@ -117,7 +117,7 @@ static void security_event_stasis_cb(struct ast_json *json)
 }
 
 static void security_stasis_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct ast_json_payload *payload = stasis_message_data(message);
 
index 58df8c1451751968b6d18785c463b708379cff9b..099e1af78fb8361442f4f360d1ba9c0cf2727c12 100644 (file)
@@ -120,7 +120,7 @@ struct stasis_message_sink *stasis_message_sink_create(void)
  * the initial lazy binding will still work as expected.
  */
 static void message_sink_cb(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct stasis_message_sink *sink = data;
 
index 1d8f62824ea5895488a589c73fe9e894fc2df7db..bd66e70cc7b2949f029a39406e4f766e3419b205 100644 (file)
@@ -1318,7 +1318,7 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_xmpp_client *client = data;
        const char *mailbox, *context;
@@ -1351,7 +1351,7 @@ static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, stru
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct ast_xmpp_client *client = data;
        struct ast_device_state_message *dev_state;
@@ -1566,7 +1566,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
 {
        struct stasis_message *msg = obj;
        struct ast_xmpp_client *client = arg;
-       xmpp_pubsub_devstate_cb(client, client->device_state_sub, NULL, msg);
+       xmpp_pubsub_devstate_cb(client, client->device_state_sub, msg);
        return 0;
 }
 
index 2c84f0c3de6d145bc3d962aed1b31323ee0955cd..bc1268fb7e17f7025ee215c5d36aeb0ef7481925 100644 (file)
@@ -220,7 +220,7 @@ static void app_dtor(void *obj)
 }
 
 static void sub_default_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct app *app = data;
        RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -363,7 +363,6 @@ static channel_snapshot_monitor channel_monitors[] = {
 
 static void sub_channel_update_handler(void *data,
                 struct stasis_subscription *sub,
-                struct stasis_topic *topic,
                 struct stasis_message *message)
 {
        struct app *app = data;
@@ -411,7 +410,6 @@ static struct ast_json *simple_bridge_event(
 
 static void sub_bridge_update_handler(void *data,
                 struct stasis_subscription *sub,
-                struct stasis_topic *topic,
                 struct stasis_message *message)
 {
         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@@ -447,7 +445,7 @@ static void sub_bridge_update_handler(void *data,
 }
 
 static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        struct app *app = data;
        struct ast_bridge_merge_message *merge;
@@ -476,7 +474,7 @@ static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
        }
 
        /* Forward the message to the app */
-       stasis_forward_message(app->topic, topic, message);
+       stasis_publish(app->topic, message);
 }
 
 struct app *app_create(const char *name, stasis_app_cb handler, void *data)
index ff5d681f4cfff14aea3fe635a92c160744cd50de..5a3d255d141bc30ee1c8982029180a024e5805dc 100644 (file)
@@ -309,7 +309,7 @@ static struct consumer *consumer_create(void) {
        return consumer;
 }
 
-static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct consumer *consumer = data;
        RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@@ -342,7 +342,7 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
        }
 }
 
-static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct consumer *consumer = data;
 
index ac6154d88dac8c3bac342dc66593e18cde620938..1e911e092cec6d86659b8b7ec17a63b554325ddd 100644 (file)
@@ -183,7 +183,7 @@ static struct consumer *consumer_create(int ignore_subscriptions) {
        return consumer;
 }
 
-static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
 {
        struct consumer *consumer = data;
        RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@@ -711,7 +711,6 @@ AST_TEST_DEFINE(cache)
        /* Check for new snapshot messages */
        ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
        actual_update = stasis_message_data(consumer->messages_rxed[0]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, NULL == actual_update->old_snapshot);
        ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
        ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
@@ -720,7 +719,6 @@ AST_TEST_DEFINE(cache)
 
        ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
        actual_update = stasis_message_data(consumer->messages_rxed[1]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, NULL == actual_update->old_snapshot);
        ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
        ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
@@ -736,7 +734,6 @@ AST_TEST_DEFINE(cache)
        ast_test_validate(test, 3 == actual_len);
 
        actual_update = stasis_message_data(consumer->messages_rxed[2]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
        ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
        ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
@@ -752,7 +749,6 @@ AST_TEST_DEFINE(cache)
        ast_test_validate(test, 4 == actual_len);
 
        actual_update = stasis_message_data(consumer->messages_rxed[3]);
-       ast_test_validate(test, topic == actual_update->topic);
        ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
        ast_test_validate(test, NULL == actual_update->new_snapshot);
        ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
@@ -1226,7 +1222,7 @@ AST_TEST_DEFINE(to_ami)
 }
 
 static void noop(void *data, struct stasis_subscription *sub,
-       struct stasis_topic *topic, struct stasis_message *message)
+       struct stasis_message *message)
 {
        /* no-op */
 }
index 70400a9ec736e8e7c51d551ca06ff040005a3904..be48f92488f08af02bade50e7cb88fde1df2bb43 100644 (file)
@@ -48,6 +48,31 @@ struct task_data {
        int task_complete;
 };
 
+static void task_data_dtor(void *obj)
+{
+       struct task_data *task_data = obj;
+
+       ast_mutex_destroy(&task_data->lock);
+       ast_cond_destroy(&task_data->cond);
+}
+
+/*! \brief Create a task_data object */
+static struct task_data *task_data_create(void)
+{
+       struct task_data *task_data =
+               ao2_alloc(sizeof(*task_data), task_data_dtor);
+
+       if (!task_data) {
+               return NULL;
+       }
+
+       ast_cond_init(&task_data->cond, NULL);
+       ast_mutex_init(&task_data->lock);
+       task_data->task_complete = 0;
+
+       return task_data;
+}
+
 /*!
  * \brief Queued task for baseline test.
  *
@@ -64,6 +89,30 @@ static int task(void *data)
        return 0;
 }
 
+/*!
+ * \brief Wait for a task to execute.
+ */
+static int task_wait(struct task_data *task_data)
+{
+       struct timeval start = ast_tvnow();
+       struct timespec end;
+       SCOPED_MUTEX(lock, &task_data->lock);
+
+       end.tv_sec = start.tv_sec + 30;
+       end.tv_nsec = start.tv_usec * 1000;
+
+       while (!task_data->task_complete) {
+               int res;
+               res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
+                       &end);
+               if (res == ETIMEDOUT) {
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
 /*!
  * \brief Baseline test for default taskprocessor
  *
@@ -73,12 +122,9 @@ static int task(void *data)
  */
 AST_TEST_DEFINE(default_taskprocessor)
 {
-       struct ast_taskprocessor *tps;
-       struct task_data task_data;
-       struct timeval start;
-       struct timespec ts;
-       enum ast_test_result_state res = AST_TEST_PASS;
-       int timedwait_res;
+       RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+       RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
+       int res;
 
        switch (cmd) {
        case TEST_INIT:
@@ -99,36 +145,21 @@ AST_TEST_DEFINE(default_taskprocessor)
                return AST_TEST_FAIL;
        }
 
-       start = ast_tvnow();
-
-       ts.tv_sec = start.tv_sec + 30;
-       ts.tv_nsec = start.tv_usec * 1000;
-
-       ast_cond_init(&task_data.cond, NULL);
-       ast_mutex_init(&task_data.lock);
-       task_data.task_complete = 0;
-
-       ast_taskprocessor_push(tps, task, &task_data);
-       ast_mutex_lock(&task_data.lock);
-       while (!task_data.task_complete) {
-               timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
-               if (timedwait_res == ETIMEDOUT) {
-                       break;
-               }
+       task_data = task_data_create();
+       if (!task_data) {
+               ast_test_status_update(test, "Unable to create task_data\n");
+               return AST_TEST_FAIL;
        }
-       ast_mutex_unlock(&task_data.lock);
 
-       if (!task_data.task_complete) {
+       ast_taskprocessor_push(tps, task, task_data);
+
+       res = task_wait(task_data);
+       if (res != 0) {
                ast_test_status_update(test, "Queued task did not execute!\n");
-               res = AST_TEST_FAIL;
-               goto test_end;
+               return AST_TEST_FAIL;
        }
 
-test_end:
-       tps = ast_taskprocessor_unreference(tps);
-       ast_mutex_destroy(&task_data.lock);
-       ast_cond_destroy(&task_data.cond);
-       return res;
+       return AST_TEST_PASS;
 }
 
 #define NUM_TASKS 20000
@@ -631,12 +662,78 @@ AST_TEST_DEFINE(taskprocessor_shutdown)
        return AST_TEST_PASS;
 }
 
+static int local_task_exe(struct ast_taskprocessor_local *local)
+{
+       int *local_data = local->local_data;
+       struct task_data *task_data = local->data;
+
+       *local_data = 1;
+       task(task_data);
+
+       return 0;
+}
+
+AST_TEST_DEFINE(taskprocessor_push_local)
+{
+       RAII_VAR(struct ast_taskprocessor *, tps, NULL,
+               ast_taskprocessor_unreference);
+       struct task_data *task_data;
+       int local_data;
+       int res;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = "/main/taskprocessor/";
+               info->summary = "Test of pushing local data";
+               info->description =
+                       "Ensures that local data is passed along.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+
+       tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
+       if (!tps) {
+               ast_test_status_update(test, "Unable to create test taskprocessor\n");
+               return AST_TEST_FAIL;
+       }
+
+
+       task_data = task_data_create();
+       if (!task_data) {
+               ast_test_status_update(test, "Unable to create task_data\n");
+               return AST_TEST_FAIL;
+       }
+
+       local_data = 0;
+       ast_taskprocessor_set_local(tps, &local_data);
+
+       ast_taskprocessor_push_local(tps, local_task_exe, task_data);
+
+       res = task_wait(task_data);
+       if (res != 0) {
+               ast_test_status_update(test, "Queued task did not execute!\n");
+               return AST_TEST_FAIL;
+       }
+
+       if (local_data != 1) {
+               ast_test_status_update(test,
+                       "Queued task did not set local_data!\n");
+               return AST_TEST_FAIL;
+       }
+
+       return AST_TEST_PASS;
+}
+
 static int unload_module(void)
 {
        ast_test_unregister(default_taskprocessor);
        ast_test_unregister(default_taskprocessor_load);
        ast_test_unregister(taskprocessor_listener);
        ast_test_unregister(taskprocessor_shutdown);
+       ast_test_unregister(taskprocessor_push_local);
        return 0;
 }
 
@@ -646,6 +743,7 @@ static int load_module(void)
        ast_test_register(default_taskprocessor_load);
        ast_test_register(taskprocessor_listener);
        ast_test_register(taskprocessor_shutdown);
+       ast_test_register(taskprocessor_push_local);
        return AST_MODULE_LOAD_SUCCESS;
 }