STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talk_request_type);
static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message);
+ struct stasis_message *message);
static void meetme_stasis_cleanup(void)
{
}
static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *channel_blob = stasis_message_data(message);
struct stasis_message_type *message_type;
STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type);
static void queue_channel_manager_event(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
}
static void queue_multi_channel_manager_event(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
}
static void queue_member_manager_event(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
}
/*! \brief set a member's status based on device state of that member's interface*/
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ao2_iterator miter, qiter;
struct ast_device_state_message *dev_state;
}
static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct ast_channel_blob *agent_blob;
* \param msg The stasis message for the bridge enter event
*/
static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
* \param msg The stasis message for the blind transfer event
*/
static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
* \param msg The stasis message for the attended transfer event.
*/
static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
* subroutines for further processing.
*/
static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
if (stasis_subscription_final_message(sub, msg)) {
ao2_cleanup(userdata);
* \param msg The stasis message for the local optimization begin event
*/
static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
* \param msg The stasis message for the local optimization end event
*/
static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
* \param msg The stasis message for the hangup event.
*/
static void handle_hangup(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_channel_blob *channel_blob = stasis_message_data(msg);
* subroutines for further processing.
*/
static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
if (stasis_subscription_final_message(sub, msg)) {
ao2_cleanup(userdata);
}
}
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct stasis_subscription_change *change;
/* Only looking for subscription change notices here */
static int dump_cache(void *obj, void *arg, int flags)
{
struct stasis_message *msg = obj;
- mwi_event_cb(NULL, NULL, NULL, msg);
+ mwi_event_cb(NULL, NULL, msg);
return 0;
}
}
static void confbridge_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeStart", NULL);
}
static void confbridge_end_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeEnd", NULL);
}
static void confbridge_leave_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeLeave", NULL);
}
static void confbridge_join_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeJoin", NULL);
}
static void confbridge_start_record_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeRecord", NULL);
}
static void confbridge_stop_record_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeStopRecord", NULL);
}
static void confbridge_mute_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeMute", NULL);
}
static void confbridge_unmute_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeUnmute", NULL);
}
static void confbridge_talking_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, extra_text, NULL, ast_free);
static int dahdi_sendtext(struct ast_channel *c, const char *text);
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* This module does not handle MWI in an event-based manner. However, it
* subscribes to MWI for each mailbox that is configured so that the core
static int get_unused_callno(enum callno_type type, int validated, callno_entry *entry);
static int replace_callno(const void *obj);
static void sched_delay_remove(struct sockaddr_in *sin, callno_entry entry);
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static struct ast_channel_tech iax2_tech = {
.type = "IAX2",
}
}
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* The MWI subscriptions exist just so the core knows we care about those
* mailboxes. However, we just grab the events out of the cache when it
}
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
/* This callback is only concerned with network change messages from the system topic. */
if (stasis_message_type(message) != ast_network_change_type()) {
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;
.func_channel_read = acf_channel_read,
};
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* This module does not handle MWI in an event-based manner. However, it
* subscribes to MWI for each mailbox that is configured so that the core
static int sip_poke_peer(struct sip_peer *peer, int force);
static void sip_poke_all_peers(void);
static void sip_peer_hold(struct sip_pvt *p, int hold);
-static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *);
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_message *);
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void sip_keepalive_all_peers(void);
/*--- Applications, functions, CLI and manager command helpers */
}
/*! \brief Receive MWI events that we have subscribed to */
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct sip_peer *peer = userdata;
if (stasis_subscription_final_message(sub, msg)) {
return 0;
}
-static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
/* This callback is only concerned with network change messages from the system topic. */
if (stasis_message_type(message) != ast_network_change_type()) {
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;
static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan);
static int skinny_senddigit_begin(struct ast_channel *ast, char digit);
static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration);
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
static int skinny_dialer_cb(const void *data);
static int skinny_reload(void);
set_callforwards(l, NULL, SKINNY_CFWD_ALL|SKINNY_CFWD_BUSY|SKINNY_CFWD_NOANSWER);
register_exten(l);
/* initialize MWI on line and device */
- mwi_event_cb(l, NULL, NULL, NULL);
+ mwi_event_cb(l, NULL, NULL);
AST_LIST_TRAVERSE(&l->sublines, subline, list) {
ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container);
}
send_callinfo(sub);
}
-static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct skinny_line *l = userdata;
struct skinny_device *d = l->device;
*
* \return Nothing
*/
-static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct sig_pri_span *pri = userdata;
const char *mbox_context;
sem_t sem;
};
-static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct test_cb_data *cb_data = userdata;
if (stasis_message_type(msg) != ast_presence_state_message_type()) {
*/
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
-/*!
- * \brief Publish a message from a specified topic to all the subscribers of a
- * possibly different topic.
- * \param topic Topic to publish message to.
- * \param topic Original topic message was from.
- * \param message Message
- * \since 12
- */
-void stasis_forward_message(struct stasis_topic *topic,
- struct stasis_topic *publisher_topic,
- struct stasis_message *message);
-
/*!
* \brief Wait for all pending messages on a given topic to be processed.
* \param topic Topic to await pending messages on.
/*!
* \brief Callback function type for Stasis subscriptions.
* \param data Data field provided with subscription.
- * \param topic Topic to which the message was published.
* \param message Published message.
* \since 12
*/
-typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
/*!
* \brief Create a subscription.
* \since 12
*/
struct stasis_cache_update {
- /*! \brief Topic that published \c new_snapshot */
- struct stasis_topic *topic;
/*! \brief Convenience reference to snapshot type */
struct stasis_message_type *type;
/*! \brief Old value from the cache */
*/
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
- void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message),
+ void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message),
void *data,
int needs_mailbox);
*/
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
+/*!
+ * \brief Sets the local data associated with a taskprocessor.
+ *
+ * \since 12.0.0
+ *
+ * See ast_taskprocessor_push_local().
+ *
+ * \param tps Task processor.
+ * \param local_data Local data to associate with \a tps.
+ */
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data);
+
/*!
* \brief Unreference the specified taskprocessor and its reference count will decrement.
*
*/
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
+/*! \brief Local data parameter */
+struct ast_taskprocessor_local {
+ /*! Local data, associated with the taskprocessor. */
+ void *local_data;
+ /*! Data pointer passed with this task. */
+ void *data;
+};
+
+/*!
+ * \brief Push a task into the specified taskprocessor queue and signal the
+ * taskprocessor thread.
+ *
+ * The callback receives a \ref ast_taskprocessor_local struct, which contains
+ * both the provided \a datap pointer, and any local data set on the
+ * taskprocessor with ast_taskprocessor_set_local().
+ *
+ * \param tps The taskprocessor structure
+ * \param task_exe The task handling function to push into the taskprocessor queue
+ * \param datap The data to be used by the task handling function
+ * \retval 0 success
+ * \retval -1 failure
+ * \since 12.0.0
+ */
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
+ int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
+
/*!
* \brief Pop a task off the taskprocessor and execute it.
*
ast_free((char *)generic_list->device_name);
}
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
{
struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
return 0;
}
-static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* Wow, it's cool that we've picked up on a state change, but we really want
* the actual work to be done in the core's taskprocessor execution thread
return 0;
}
-static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_cc_agent *agent = userdata;
enum ast_device_state new_state;
* \param topic The topic this message was published for
* \param message The message
*/
-static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
* \param topic The topic this message was published for
* \param message The message
*/
-static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
* \param message The message about who got parked
* */
static void handle_parked_call_message(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_parked_call_payload *payload = stasis_message_data(message);
struct ast_channel_snapshot *channel = payload->parkee;
}
static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_cache_update *update = stasis_message_data(message);
static void cel_bridge_enter_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
static void cel_bridge_leave_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
static void cel_parking_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
}
static void cel_dial_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *blob = stasis_message_data(message);
static void cel_generic_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
static void cel_blind_transfer_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *obj = stasis_message_data(message);
static void cel_attended_transfer_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_attended_transfer_message *xfer = stasis_message_data(message);
static void cel_pickup_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
static void cel_local_cb(
void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
return 1;
}
-static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
enum ast_device_state aggregate_state;
char *device;
/*! \brief Handler for channel snapshot cache clears */
static void endpoint_cache_clear(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_endpoint *endpoint = data;
}
static void endpoint_default(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_endpoint *endpoint = data;
{{ "restart", "gracefully", NULL }},
};
-static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
+static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void acl_change_stasis_subscribe(void)
{
}
static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
}
static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_json_payload *payload = stasis_message_data(message);
#ifdef TEST_FRAMEWORK
static void test_suite_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_test_suite_message_payload *payload;
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;
};
static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
}
static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_merge_message *merge_msg = stasis_message_data(message);
}
static void channel_enter_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
static const char *swap_name = "SwapUniqueid: ";
}
static void channel_leave_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
};
static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
}
static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
}
static void channel_hangup_request_cb(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
}
static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
struct ast_channel_snapshot *spyer;
}
static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
}
static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
}
static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
}
static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct ast_channel_blob *payload = stasis_message_data(message);
}
static void channel_fax_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
}
static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
struct ast_json *blob = payload->blob;
}
static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
}
static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
}
static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
* \brief Callback processing messages for channel dialing
*/
static void channel_dial_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
const char *dialstatus;
}
static void channel_hold_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
const char *musicclass;
}
static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
}
static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
- /* XXX This looks wrong. Nothing should post or forward to a caching
- * topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
- * to dig to make sure I don't break anything, though.
- */
- stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
+ stasis_publish(ast_manager_get_topic(), message);
}
int manager_endpoints_init(void)
/*! \brief Generic MWI event callback used for one-off events from voicemail modules */
static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_blob *payload = stasis_message_data(message);
}
static void mwi_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_state *mwi_state;
ao2_iterator_destroy(&iter);
}
-static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_device_state_message *dev_state;
struct ast_hint *hint;
return res;
}
-static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_presence_state_message *presence_state = stasis_message_data(msg);
struct ast_hint *hint;
}
static void format_update_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
ast_sounds_reindex();
}
* \param message Message to send.
*/
static void subscription_invoke(struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
/* Notify that the final message has been received */
}
/* Since sub is mostly immutable, no need to lock sub */
- sub->callback(sub->data, sub, topic, message);
+ sub->callback(sub->data, sub, message);
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
if (!sub->mailbox) {
return NULL;
}
+ ast_taskprocessor_set_local(sub->mailbox, sub);
+ /* Taskprocessor has a reference */
+ ao2_ref(sub, +1);
}
ao2_ref(topic, +1);
return internal_stasis_subscribe(topic, callback, data, 1);
}
+static int sub_cleanup(void *data)
+{
+ struct stasis_subscription *sub = data;
+ ao2_cleanup(sub);
+ return 0;
+}
+
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
/* The subscription may be the last ref to this topic. Hold
/* Now let everyone know about the unsubscribe */
send_subscription_unsubscribe(topic, sub);
+ /* When all that's done, remove the ref the mailbox has on the sub */
+ if (sub->mailbox) {
+ ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
+ }
+
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
return NULL;
return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
-/*!
- * \internal
- * \brief Information needed to dispatch a message to a subscription
- */
-struct dispatch {
- /*! Topic message was published to */
- struct stasis_topic *topic;
- /*! The message itself */
- struct stasis_message *message;
- /*! Subscription receiving the message */
- struct stasis_subscription *sub;
-};
-
-static void dispatch_dtor(struct dispatch *dispatch)
-{
- ao2_cleanup(dispatch->topic);
- ao2_cleanup(dispatch->message);
- ao2_cleanup(dispatch->sub);
-
- ast_free(dispatch);
-}
-
-static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
-{
- struct dispatch *dispatch;
-
- ast_assert(topic != NULL);
- ast_assert(message != NULL);
- ast_assert(sub != NULL);
-
- dispatch = ast_malloc(sizeof(*dispatch));
- if (!dispatch) {
- return NULL;
- }
-
- dispatch->topic = topic;
- ao2_ref(topic, +1);
-
- dispatch->message = message;
- ao2_ref(message, +1);
-
- dispatch->sub = sub;
- ao2_ref(sub, +1);
-
- return dispatch;
-}
-
/*!
* \brief Dispatch a message to a subscriber
* \param data \ref dispatch object
* \return 0
*/
-static int dispatch_exec(void *data)
+static int dispatch_exec(struct ast_taskprocessor_local *local)
{
- struct dispatch *dispatch = data;
+ struct stasis_subscription *sub = local->local_data;
+ struct stasis_message *message = local->data;
- subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
- dispatch_dtor(dispatch);
+ subscription_invoke(sub, message);
+ ao2_cleanup(message);
return 0;
}
static void dispatch_message(struct stasis_subscription *sub,
- struct stasis_topic *publisher_topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (sub->mailbox) {
- struct dispatch *dispatch;
-
- dispatch = dispatch_create(publisher_topic, message, sub);
- if (!dispatch) {
+ ao2_bump(message);
+ if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
+ /* Push failed; ugh. */
ast_log(LOG_DEBUG, "Dropping dispatch\n");
- return;
- }
-
- if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) != 0) {
- /* Push failed; just delete the dispatch.
- */
- ast_log(LOG_DEBUG, "Dropping dispatch\n");
- dispatch_dtor(dispatch);
+ ao2_cleanup(message);
}
} else {
/* Dispatch directly */
- subscription_invoke(sub, publisher_topic, message);
+ subscription_invoke(sub, message);
}
}
-void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
+void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
{
size_t i;
/* The topic may be unref'ed by the subscription invocation.
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
- ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
- struct stasis_subscription *sub = ast_vector_get(topic->subscribers, i);
+ struct stasis_subscription *sub =
+ ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
- dispatch_message(sub, publisher_topic, message);
+ dispatch_message(sub, message);
}
}
-void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
-{
- stasis_forward_message(topic, topic, message);
-}
-
/*!
* \brief Forwarding information
*
stasis_publish(topic, msg);
/* Now we have to dispatch to the subscription itself */
- dispatch_message(sub, topic, msg);
+ dispatch_message(sub, msg);
}
struct topic_pool_entry {
static void stasis_cache_update_dtor(void *obj)
{
struct stasis_cache_update *update = obj;
- ao2_cleanup(update->topic);
- update->topic = NULL;
ao2_cleanup(update->old_snapshot);
update->old_snapshot = NULL;
ao2_cleanup(update->new_snapshot);
update->type = NULL;
}
-static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
+static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
{
RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
- ast_assert(topic != NULL);
ast_assert(old_snapshot != NULL || new_snapshot != NULL);
update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
return NULL;
}
- ao2_ref(topic, +1);
- update->topic = topic;
if (old_snapshot) {
ao2_ref(old_snapshot, +1);
update->old_snapshot = old_snapshot;
}
static void caching_topic_exec(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
struct stasis_caching_topic *caching_topic = data;
if (clear_id) {
old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
if (old_snapshot) {
- update = update_create(topic, old_snapshot, NULL);
+ update = update_create(old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
return;
}
old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
- update = update_create(topic, old_snapshot, message);
+ update = update_create(old_snapshot, message);
if (update == NULL) {
return;
}
static void router_dispatch(void *data,
struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
struct stasis_message_route route;
if (find_route(router, message, &route) == 0) {
- route.callback(route.data, sub, topic, message);
+ route.callback(route.data, sub, message);
}
if (stasis_subscription_final_message(sub, message)) {
}
static void guarantee_handler(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
/* Wait for our particular message */
if (data == message) {
*/
struct tps_task {
/*! \brief The execute() task callback function pointer */
- int (*execute)(void *datap);
+ union {
+ int (*execute)(void *datap);
+ int (*execute_local)(struct ast_taskprocessor_local *local);
+ } callback;
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
+ unsigned int wants_local:1;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
const char *name;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
+ void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
- if ((t = ast_calloc(1, sizeof(*t)))) {
- t->execute = task_exe;
- t->datap = datap;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
+ }
+
+ t->callback.execute = task_exe;
+ t->datap = datap;
+
+ return t;
+}
+
+static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
+{
+ struct tps_task *t;
+ if (!task_exe) {
+ ast_log(LOG_ERROR, "task_exe is NULL!\n");
+ return NULL;
+ }
+
+ t = ast_calloc(1, sizeof(*t));
+ if (!t) {
+ ast_log(LOG_ERROR, "failed to allocate task!\n");
+ return NULL;
}
+
+ t->callback.execute_local = task_exe;
+ t->datap = datap;
+ t->wants_local = 1;
+
return t;
}
return __allocate_taskprocessor(name, listener);
}
+void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
+ void *local_data)
+{
+ SCOPED_AO2LOCK(lock, tps);
+ tps->local_data = local_data;
+}
+
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
}
/* push the task into the taskprocessor queue */
-int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
- struct tps_task *t;
int previous_size;
int was_empty;
- if (!tps || !task_exe) {
- ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
+ if (!tps) {
+ ast_log(LOG_ERROR, "tps is NULL!\n");
return -1;
}
- if (!(t = tps_task_alloc(task_exe, datap))) {
- ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
+
+ if (!t) {
+ ast_log(LOG_ERROR, "t is NULL!\n");
return -1;
}
+
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
return 0;
}
+int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
+}
+
+int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
+{
+ return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
+}
+
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
+ struct ast_taskprocessor_local local;
struct tps_task *t;
int size;
}
tps->executing = 1;
+
+ if (t->wants_local) {
+ local.local_data = tps->local_data;
+ local.data = t->datap;
+ }
ao2_unlock(tps);
- t->execute(t->datap);
+ if (t->wants_local) {
+ t->callback.execute_local(&local);
+ } else {
+ t->callback.execute(t->datap);
+ }
tps_task_free(t);
ao2_lock(tps);
cap_slin = ast_format_cap_destroy(cap_slin);
}
-static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct park_announce_subscription_data *pa_data = data;
char *dial_string = pa_data->dial_string;
}
}
-static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
if (stasis_subscription_final_message(sub, message)) {
ast_free(data);
);
}
-static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
if (stasis_message_type(message) == ast_parked_call_type()) {
struct ast_parked_call_payload *parked_call_message = stasis_message_data(message);
STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type);
static void agi_channel_manager_event(void *data,
- struct stasis_subscription *sub, struct stasis_topic *topic,
+ struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
* \param message The message itself.
*/
static void statsmaker(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
RAII_VAR(struct ast_str *, metric, NULL, ast_free);
* \param message The message itself.
*/
static void updates(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
/* Since this came from a message router, we know the type of the
* message. We can cast the data without checking its type.
* \param message The message itself.
*/
static void default_route(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
if (stasis_subscription_final_message(sub, message)) {
/* Much like with the regular subscription, you may need to
const char* collection_name);
static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
const char *context, const char *oldmsgs, const char *newmsgs);
-static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
-static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
+static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
const char *event_type, unsigned int cachable);
/* No transports in this version */
* \param data void pointer to ast_client structure
* \return void
*/
-static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
const char *mailbox;
const char *context;
* \param data void pointer to ast_client structure
* \return void
*/
-static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct aji_client *client = data;
struct ast_device_state_message *dev_state;
{
struct stasis_message *msg = obj;
struct aji_client *client = arg;
- aji_devstate_cb(client, device_state_sub, NULL, msg);
+ aji_devstate_cb(client, device_state_sub, msg);
return 0;
}
};
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg);
+ struct stasis_message *msg);
static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char *mailbox, struct mwi_subscription *mwi_sub)
{
}
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *msg)
+ struct stasis_message *msg)
{
struct mwi_subscription *mwi_sub = userdata;
}
static void refer_progress_bridge(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct refer_progress *progress = data;
struct ast_bridge_blob *enter_blob;
}
static void security_stasis_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct ast_json_payload *payload = stasis_message_data(message);
* the initial lazy binding will still work as expected.
*/
static void message_sink_cb(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct stasis_message_sink *sink = data;
* \param data void pointer to ast_client structure
* \return void
*/
-static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_xmpp_client *client = data;
const char *mailbox, *context;
* \param data void pointer to ast_client structure
* \return void
*/
-static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
+static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_xmpp_client *client = data;
struct ast_device_state_message *dev_state;
{
struct stasis_message *msg = obj;
struct ast_xmpp_client *client = arg;
- xmpp_pubsub_devstate_cb(client, client->device_state_sub, NULL, msg);
+ xmpp_pubsub_devstate_cb(client, client->device_state_sub, msg);
return 0;
}
}
static void sub_default_handler(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct app *app = data;
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
static void sub_channel_update_handler(void *data,
struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
struct app *app = data;
static void sub_bridge_update_handler(void *data,
struct stasis_subscription *sub,
- struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
}
static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
struct app *app = data;
struct ast_bridge_merge_message *merge;
}
/* Forward the message to the app */
- stasis_forward_message(app->topic, topic, message);
+ stasis_publish(app->topic, message);
}
struct app *app_create(const char *name, stasis_app_cb handler, void *data)
return consumer;
}
-static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
}
}
-static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
return consumer;
}
-static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
+static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
/* Check for new snapshot messages */
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
actual_update = stasis_message_data(consumer->messages_rxed[0]);
- ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
actual_update = stasis_message_data(consumer->messages_rxed[1]);
- ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
ast_test_validate(test, 3 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[2]);
- ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
ast_test_validate(test, 4 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[3]);
- ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
ast_test_validate(test, NULL == actual_update->new_snapshot);
ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
}
static void noop(void *data, struct stasis_subscription *sub,
- struct stasis_topic *topic, struct stasis_message *message)
+ struct stasis_message *message)
{
/* no-op */
}
int task_complete;
};
+static void task_data_dtor(void *obj)
+{
+ struct task_data *task_data = obj;
+
+ ast_mutex_destroy(&task_data->lock);
+ ast_cond_destroy(&task_data->cond);
+}
+
+/*! \brief Create a task_data object */
+static struct task_data *task_data_create(void)
+{
+ struct task_data *task_data =
+ ao2_alloc(sizeof(*task_data), task_data_dtor);
+
+ if (!task_data) {
+ return NULL;
+ }
+
+ ast_cond_init(&task_data->cond, NULL);
+ ast_mutex_init(&task_data->lock);
+ task_data->task_complete = 0;
+
+ return task_data;
+}
+
/*!
* \brief Queued task for baseline test.
*
return 0;
}
+/*!
+ * \brief Wait for a task to execute.
+ */
+static int task_wait(struct task_data *task_data)
+{
+ struct timeval start = ast_tvnow();
+ struct timespec end;
+ SCOPED_MUTEX(lock, &task_data->lock);
+
+ end.tv_sec = start.tv_sec + 30;
+ end.tv_nsec = start.tv_usec * 1000;
+
+ while (!task_data->task_complete) {
+ int res;
+ res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
+ &end);
+ if (res == ETIMEDOUT) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
/*!
* \brief Baseline test for default taskprocessor
*
*/
AST_TEST_DEFINE(default_taskprocessor)
{
- struct ast_taskprocessor *tps;
- struct task_data task_data;
- struct timeval start;
- struct timespec ts;
- enum ast_test_result_state res = AST_TEST_PASS;
- int timedwait_res;
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+ RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
+ int res;
switch (cmd) {
case TEST_INIT:
return AST_TEST_FAIL;
}
- start = ast_tvnow();
-
- ts.tv_sec = start.tv_sec + 30;
- ts.tv_nsec = start.tv_usec * 1000;
-
- ast_cond_init(&task_data.cond, NULL);
- ast_mutex_init(&task_data.lock);
- task_data.task_complete = 0;
-
- ast_taskprocessor_push(tps, task, &task_data);
- ast_mutex_lock(&task_data.lock);
- while (!task_data.task_complete) {
- timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
- if (timedwait_res == ETIMEDOUT) {
- break;
- }
+ task_data = task_data_create();
+ if (!task_data) {
+ ast_test_status_update(test, "Unable to create task_data\n");
+ return AST_TEST_FAIL;
}
- ast_mutex_unlock(&task_data.lock);
- if (!task_data.task_complete) {
+ ast_taskprocessor_push(tps, task, task_data);
+
+ res = task_wait(task_data);
+ if (res != 0) {
ast_test_status_update(test, "Queued task did not execute!\n");
- res = AST_TEST_FAIL;
- goto test_end;
+ return AST_TEST_FAIL;
}
-test_end:
- tps = ast_taskprocessor_unreference(tps);
- ast_mutex_destroy(&task_data.lock);
- ast_cond_destroy(&task_data.cond);
- return res;
+ return AST_TEST_PASS;
}
#define NUM_TASKS 20000
return AST_TEST_PASS;
}
+static int local_task_exe(struct ast_taskprocessor_local *local)
+{
+ int *local_data = local->local_data;
+ struct task_data *task_data = local->data;
+
+ *local_data = 1;
+ task(task_data);
+
+ return 0;
+}
+
+AST_TEST_DEFINE(taskprocessor_push_local)
+{
+ RAII_VAR(struct ast_taskprocessor *, tps, NULL,
+ ast_taskprocessor_unreference);
+ struct task_data *task_data;
+ int local_data;
+ int res;
+
+ switch (cmd) {
+ case TEST_INIT:
+ info->name = __func__;
+ info->category = "/main/taskprocessor/";
+ info->summary = "Test of pushing local data";
+ info->description =
+ "Ensures that local data is passed along.";
+ return AST_TEST_NOT_RUN;
+ case TEST_EXECUTE:
+ break;
+ }
+
+
+ tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
+ if (!tps) {
+ ast_test_status_update(test, "Unable to create test taskprocessor\n");
+ return AST_TEST_FAIL;
+ }
+
+
+ task_data = task_data_create();
+ if (!task_data) {
+ ast_test_status_update(test, "Unable to create task_data\n");
+ return AST_TEST_FAIL;
+ }
+
+ local_data = 0;
+ ast_taskprocessor_set_local(tps, &local_data);
+
+ ast_taskprocessor_push_local(tps, local_task_exe, task_data);
+
+ res = task_wait(task_data);
+ if (res != 0) {
+ ast_test_status_update(test, "Queued task did not execute!\n");
+ return AST_TEST_FAIL;
+ }
+
+ if (local_data != 1) {
+ ast_test_status_update(test,
+ "Queued task did not set local_data!\n");
+ return AST_TEST_FAIL;
+ }
+
+ return AST_TEST_PASS;
+}
+
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
+ ast_test_unregister(taskprocessor_push_local);
return 0;
}
ast_test_register(default_taskprocessor_load);
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
+ ast_test_register(taskprocessor_push_local);
return AST_MODULE_LOAD_SUCCESS;
}