*.orig
tests/CI/output
.develvars
+.devcontainer/
+.claude/
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2026, Aurora Innovation AB
+ *
+ * Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis broadcast dialplan application
+ *
+ * \author Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
+ */
+
+/*** MODULEINFO
+ <depend type="module">res_stasis</depend>
+ <depend type="module">res_stasis_broadcast</depend>
+ <support_level>extended</support_level>
+ ***/
+
+#include "asterisk.h"
+
+#include "asterisk/app.h"
+#include "asterisk/module.h"
+#include "asterisk/pbx.h"
+#include "asterisk/stasis_app_broadcast.h"
+#include "asterisk/stasis_app_impl.h"
+
+/*** DOCUMENTATION
+ <application name="StasisBroadcast" language="en_US">
+ <since>
+ <version>20.17.0</version>
+ <version>22.7.0</version>
+ <version>23.1.0</version>
+ </since>
+ <synopsis>Broadcast a channel to multiple ARI applications for claiming,
+ then hand control to the winning application.</synopsis>
+ <syntax>
+ <parameter name="timeout">
+ <para>Timeout in milliseconds to wait for a claim.</para>
+ <para>Valid range: 0 to 60000ms</para>
+ <para>Default: 500ms</para>
+ </parameter>
+ <parameter name="app_filter">
+ <para>Regular expression to filter which ARI applications
+ receive the broadcast. Only applications with names matching
+ the regex will be notified.</para>
+ <para>Because arguments are comma-delimited, commas cannot
+ appear in the regex pattern. Use character classes
+ (e.g. <literal>[,]</literal>) if a literal comma is
+ needed, or omit the filter and handle selection in the
+ ARI application.</para>
+ <para>Default: all connected applications</para>
+ </parameter>
+ <parameter name="args">
+ <para>Optional colon-delimited arguments passed to the winning
+ application via the <literal>StasisStart</literal> event. These
+ are equivalent to the extra arguments in <literal>Stasis()</literal>.</para>
+ <para>Example: <literal>sales:priority-high</literal></para>
+ </parameter>
+ <parameter name="notify_claimed">
+ <para>Whether to send a <literal>CallClaimed</literal> event to
+ ARI applications when a channel is claimed.</para>
+ <para>When enabled, the <literal>CallClaimed</literal> event is
+ sent only to applications that matched the
+ <replaceable>app_filter</replaceable> (or all applications if no
+ filter was set).</para>
+ <para>Disabled by default to minimise WebSocket traffic under
+ high load. Losing claimants already receive a
+ <literal>409</literal> HTTP response.</para>
+ <para>Default: no</para>
+ </parameter>
+ </syntax>
+ <description>
+ <para>Broadcasts the incoming channel to all connected ARI applications
+ (or a filtered subset) via a <literal>CallBroadcast</literal> event.
+ ARI applications can respond with a claim request. The first application
+ to claim the channel wins, and subsequent claims are rejected.</para>
+ <para>If an application claims the channel within the timeout, the channel
+ is automatically placed under Stasis control with the winning application,
+ exactly as if <literal>Stasis(winner_app)</literal> had been called.
+ The winning application receives a <literal>StasisStart</literal> event
+ and has full channel control until it calls <literal>continue</literal>
+ or the channel hangs up.</para>
+ <para>If no application claims the channel within the timeout, control
+ returns to the dialplan immediately, allowing fallback handling.</para>
+ <para>This application will set the following channel variables:</para>
+ <variablelist>
+ <variable name="STASISSTATUS">
+ <value name="SUCCESS">
+ An application claimed the channel and the Stasis
+ session completed without failures.
+ </value>
+ <value name="FAILED">
+ An application claimed the channel but a failure
+ occurred when executing the Stasis application.
+ </value>
+ <value name="TIMEOUT">
+ No application claimed the channel within the
+ timeout period.
+ </value>
+ </variable>
+ </variablelist>
+ <example>
+ ; Broadcast with default timeout (500ms) to all apps
+ ; Channel automatically enters Stasis with the winner
+ exten => _X.,1,StasisBroadcast()
+ same => n,GotoIf($["${STASISSTATUS}"="TIMEOUT"]?no_route)
+ same => n,Hangup()
+ same => n(no_route),Playback(sorry-no-agent)
+ same => n,Hangup()
+ </example>
+ <example>
+ ; Broadcast with custom timeout, app filter, and args for the winner
+ exten => _X.,1,StasisBroadcast(2000,^ivr-.*,sales:priority-high)
+ same => n,GotoIf($["${STASISSTATUS}"="TIMEOUT"]?no_route)
+ same => n,Hangup()
+ same => n(no_route),Playback(sorry-no-agent)
+ same => n,Hangup()
+ </example>
+ </description>
+ </application>
+ ***/
+
+/*! \brief Dialplan application name */
+static const char *app = "StasisBroadcast";
+
+/*! \brief Default timeout in milliseconds */
+#define DEFAULT_TIMEOUT_MS 500
+
+/*! \brief Maximum timeout in milliseconds */
+#define MAX_TIMEOUT_MS 60000
+
+/*! \brief Maximum number of Stasis arguments */
+#define MAX_STASIS_ARGS 128
+
+/*! \brief StasisBroadcast dialplan application callback */
+static int stasis_broadcast_exec(struct ast_channel *chan, const char *data)
+{
+ char *parse = NULL;
+ int timeout_ms = DEFAULT_TIMEOUT_MS;
+ const char *app_filter = NULL;
+ const char *stasis_args_raw = NULL;
+ unsigned int flags = STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED;
+ char *winner = NULL;
+ int result = 0;
+ int stasis_argc = 0;
+ char *stasis_argv[MAX_STASIS_ARGS];
+
+ AST_DECLARE_APP_ARGS(args,
+ AST_APP_ARG(timeout);
+ AST_APP_ARG(app_filter);
+ AST_APP_ARG(stasis_args);
+ AST_APP_ARG(notify_claimed);
+ );
+
+ ast_assert(chan != NULL);
+
+ /* Initialize channel variable */
+ pbx_builtin_setvar_helper(chan, "STASISSTATUS", "");
+
+ /* Parse positional arguments if provided */
+ if (!ast_strlen_zero(data)) {
+ parse = ast_strdupa(data);
+ AST_STANDARD_APP_ARGS(args, parse);
+
+ if (!ast_strlen_zero(args.timeout)) {
+ if (sscanf(args.timeout, "%d", &timeout_ms) != 1
+ || timeout_ms < 0 || timeout_ms > MAX_TIMEOUT_MS) {
+ ast_log(LOG_WARNING,
+ "Channel %s: invalid timeout value '%s' (must be 0-%dms), using default %dms\n",
+ ast_channel_name(chan), args.timeout, MAX_TIMEOUT_MS, DEFAULT_TIMEOUT_MS);
+ timeout_ms = DEFAULT_TIMEOUT_MS;
+ }
+ }
+
+ if (!ast_strlen_zero(args.app_filter)) {
+ app_filter = args.app_filter;
+ }
+
+ if (!ast_strlen_zero(args.stasis_args)) {
+ stasis_args_raw = args.stasis_args;
+ }
+
+ if (!ast_strlen_zero(args.notify_claimed) && ast_true(args.notify_claimed)) {
+ flags &= ~STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED;
+ }
+ }
+
+ /*
+ * Parse colon-delimited Stasis arguments. stasis_argv[] holds
+ * pointers into the stack-allocated args_copy buffer. This is
+ * safe because stasis_app_exec is called within this same
+ * function scope so the stack frame remains alive.
+ */
+ if (!ast_strlen_zero(stasis_args_raw)) {
+ char *args_copy = ast_strdupa(stasis_args_raw);
+ char *arg;
+
+ while ((arg = strsep(&args_copy, ":")) != NULL && stasis_argc < MAX_STASIS_ARGS) {
+ stasis_argv[stasis_argc++] = arg;
+ }
+ }
+
+ ast_debug(3, "Broadcasting channel %s (timeout=%dms, filter=%s, args=%d)\n",
+ ast_channel_name(chan), timeout_ms, app_filter ? app_filter : "none",
+ stasis_argc);
+
+ /* Start the broadcast */
+ result = stasis_app_broadcast_channel(chan, timeout_ms, app_filter, flags);
+ if (result) {
+ ast_log(LOG_ERROR, "Failed to broadcast channel %s: %s\n",
+ ast_channel_name(chan),
+ result == AST_OPTIONAL_API_UNAVAILABLE ? "res_stasis_broadcast not loaded" : "internal error");
+ pbx_builtin_setvar_helper(chan, "STASISSTATUS", "FAILED");
+ return 0;
+ }
+
+ /* Wait for a claim. A late claim can arrive between the timeout
+ * expiring and our cleanup call, so always check for a winner
+ * regardless of the wait result. */
+ stasis_app_broadcast_wait(chan, timeout_ms);
+ winner = stasis_app_broadcast_winner(ast_channel_uniqueid(chan));
+
+ if (winner) {
+ int ret;
+
+ ast_debug(3, "Channel %s claimed by %s, entering Stasis\n",
+ ast_channel_name(chan), winner);
+
+ /* Defer cleanup until after Stasis so concurrent claimants can still
+ * find the context (with claimed=1) and receive 409 Conflict instead
+ * of 404 Not Found. */
+ ret = stasis_app_exec(chan, winner, stasis_argc, stasis_argv);
+ ast_free(winner);
+
+ /* Clean up now that the Stasis session has ended */
+ stasis_app_broadcast_cleanup(ast_channel_uniqueid(chan));
+
+ if (ret) {
+ pbx_builtin_setvar_helper(chan, "STASISSTATUS", "FAILED");
+ if (ast_check_hangup(chan)) {
+ return -1;
+ }
+ } else {
+ pbx_builtin_setvar_helper(chan, "STASISSTATUS", "SUCCESS");
+ }
+ } else {
+ /* No winner: clean up immediately, nothing to race against */
+ stasis_app_broadcast_cleanup(ast_channel_uniqueid(chan));
+ ast_log(LOG_WARNING, "Channel %s: not claimed within %dms timeout\n",
+ ast_channel_name(chan), timeout_ms);
+ pbx_builtin_setvar_helper(chan, "STASISSTATUS", "TIMEOUT");
+ }
+
+ return 0;
+}
+
+static int load_module(void)
+{
+ return ast_register_application_xml(app, stasis_broadcast_exec);
+}
+
+static int unload_module(void)
+{
+ return ast_unregister_application(app);
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_DEFAULT,
+ "Stasis application broadcast",
+ .support_level = AST_MODULE_SUPPORT_EXTENDED,
+ .load = load_module,
+ .unload = unload_module,
+ .requires = "res_stasis,res_stasis_broadcast",
+);
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2026, Aurora Innovation AB
+ *
+ * Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+#ifndef _ASTERISK_STASIS_APP_BROADCAST_H
+#define _ASTERISK_STASIS_APP_BROADCAST_H
+
+/*! \file
+ *
+ * \brief Stasis Application Broadcast API
+ *
+ * \author Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
+ *
+ * This module provides the infrastructure for broadcasting incoming channels
+ * to multiple ARI applications and handling first-claim winner logic.
+ */
+
+#include "asterisk/channel.h"
+#include "asterisk/optional_api.h"
+
+/*! \brief Suppress CallClaimed event for this broadcast */
+#define STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED (1 << 0)
+
+/*!
+ * \brief Start a broadcast for a channel
+ *
+ * \since 20
+ *
+ * Broadcasts a channel to all ARI applications (or filtered applications)
+ * allowing them to claim the channel. Only the first claim will succeed.
+ *
+ * When a channel is claimed, a CallClaimed event is sent only to applications
+ * that matched the \a app_filter (or all apps if no filter was set). This can
+ * be suppressed entirely with #STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED.
+ *
+ * \param chan The channel to broadcast
+ * \param timeout_ms Timeout in milliseconds to wait for a claim
+ * \param app_filter Optional regex filter for application names (NULL for all)
+ * \param flags Combination of STASIS_BROADCAST_FLAG_* values
+ *
+ * \retval 0 on success
+ * \retval -1 on error
+ * \retval AST_OPTIONAL_API_UNAVAILABLE if res_stasis_broadcast is not loaded
+ */
+AST_OPTIONAL_API(int, stasis_app_broadcast_channel,
+ (struct ast_channel *chan, int timeout_ms, const char *app_filter,
+ unsigned int flags),
+ { return AST_OPTIONAL_API_UNAVAILABLE; });
+
+/*!
+ * \brief Attempt to claim a broadcast channel
+ *
+ * \since 20
+ *
+ * Atomically attempts to claim a channel that is in broadcast state.
+ * Only the first claim for a given channel will succeed.
+ *
+ * \param channel_id The unique ID of the channel
+ * \param app_name The name of the application claiming the channel
+ *
+ * \retval 0 if claim successful
+ * \retval -1 if channel not found
+ * \retval -2 if already claimed by another application
+ * \retval AST_OPTIONAL_API_UNAVAILABLE if res_stasis_broadcast is not loaded
+ */
+AST_OPTIONAL_API(int, stasis_app_claim_channel,
+ (const char *channel_id, const char *app_name),
+ { return AST_OPTIONAL_API_UNAVAILABLE; });
+
+/*!
+ * \brief Get the winner app name for a broadcast channel
+ *
+ * \since 20
+ *
+ * \param channel_id The unique ID of the channel
+ *
+ * \return A copy of the winner app name (caller must free with ast_free),
+ * or NULL if not claimed or not found
+ * \retval NULL if res_stasis_broadcast is not loaded
+ */
+AST_OPTIONAL_API(char *, stasis_app_broadcast_winner,
+ (const char *channel_id),
+ { return NULL; });
+
+/*!
+ * \brief Wait for a broadcast channel to be claimed
+ *
+ * \since 20
+ *
+ * Blocks until the channel is claimed or the timeout expires.
+ *
+ * \param chan The channel
+ * \param timeout_ms Maximum time to wait in milliseconds
+ *
+ * \retval 0 if claimed within timeout
+ * \retval -1 if timeout expired or error
+ * \retval AST_OPTIONAL_API_UNAVAILABLE if res_stasis_broadcast is not loaded
+ */
+AST_OPTIONAL_API(int, stasis_app_broadcast_wait,
+ (struct ast_channel *chan, int timeout_ms),
+ { return AST_OPTIONAL_API_UNAVAILABLE; });
+
+/*!
+ * \brief Clean up broadcast context for a channel
+ *
+ * \since 20
+ *
+ * Removes the broadcast context when the channel is done or leaving the
+ * broadcast state.
+ *
+ * \param channel_id The unique ID of the channel
+ */
+AST_OPTIONAL_API(void, stasis_app_broadcast_cleanup,
+ (const char *channel_id),
+ { return; });
+
+#endif /* _ASTERISK_STASIS_APP_BROADCAST_H */
return ast_ari_validate_bridge_video_source_changed;
}
+int ast_ari_validate_call_broadcast(struct ast_json *json)
+{
+ int res = 1;
+ struct ast_json_iter *iter;
+ int has_type = 0;
+ int has_application = 0;
+ int has_timestamp = 0;
+ int has_channel = 0;
+
+ for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+ if (strcmp("asterisk_id", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field asterisk_id failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_type = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field type failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("application", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_application = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field application failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_timestamp = 1;
+ prop_is_valid = ast_ari_validate_date(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field timestamp failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("called", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field called failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("caller", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field caller failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_channel = 1;
+ prop_is_valid = ast_ari_validate_channel(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast field channel failed validation\n");
+ res = 0;
+ }
+ } else
+ {
+ ast_log(LOG_ERROR,
+ "ARI CallBroadcast has undocumented field %s\n",
+ ast_json_object_iter_key(iter));
+ res = 0;
+ }
+ }
+
+ if (!has_type) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast missing required field type\n");
+ res = 0;
+ }
+
+ if (!has_application) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast missing required field application\n");
+ res = 0;
+ }
+
+ if (!has_timestamp) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast missing required field timestamp\n");
+ res = 0;
+ }
+
+ if (!has_channel) {
+ ast_log(LOG_ERROR, "ARI CallBroadcast missing required field channel\n");
+ res = 0;
+ }
+
+ return res;
+}
+
+ari_validator ast_ari_validate_call_broadcast_fn(void)
+{
+ return ast_ari_validate_call_broadcast;
+}
+
+int ast_ari_validate_call_claimed(struct ast_json *json)
+{
+ int res = 1;
+ struct ast_json_iter *iter;
+ int has_type = 0;
+ int has_application = 0;
+ int has_timestamp = 0;
+ int has_channel = 0;
+ int has_winner_app = 0;
+
+ for (iter = ast_json_object_iter(json); iter; iter = ast_json_object_iter_next(json, iter)) {
+ if (strcmp("asterisk_id", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallClaimed field asterisk_id failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("type", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_type = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallClaimed field type failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("application", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_application = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallClaimed field application failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("timestamp", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_timestamp = 1;
+ prop_is_valid = ast_ari_validate_date(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallClaimed field timestamp failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("channel", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_channel = 1;
+ prop_is_valid = ast_ari_validate_channel(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallClaimed field channel failed validation\n");
+ res = 0;
+ }
+ } else
+ if (strcmp("winner_app", ast_json_object_iter_key(iter)) == 0) {
+ int prop_is_valid;
+ has_winner_app = 1;
+ prop_is_valid = ast_ari_validate_string(
+ ast_json_object_iter_value(iter));
+ if (!prop_is_valid) {
+ ast_log(LOG_ERROR, "ARI CallClaimed field winner_app failed validation\n");
+ res = 0;
+ }
+ } else
+ {
+ ast_log(LOG_ERROR,
+ "ARI CallClaimed has undocumented field %s\n",
+ ast_json_object_iter_key(iter));
+ res = 0;
+ }
+ }
+
+ if (!has_type) {
+ ast_log(LOG_ERROR, "ARI CallClaimed missing required field type\n");
+ res = 0;
+ }
+
+ if (!has_application) {
+ ast_log(LOG_ERROR, "ARI CallClaimed missing required field application\n");
+ res = 0;
+ }
+
+ if (!has_timestamp) {
+ ast_log(LOG_ERROR, "ARI CallClaimed missing required field timestamp\n");
+ res = 0;
+ }
+
+ if (!has_channel) {
+ ast_log(LOG_ERROR, "ARI CallClaimed missing required field channel\n");
+ res = 0;
+ }
+
+ if (!has_winner_app) {
+ ast_log(LOG_ERROR, "ARI CallClaimed missing required field winner_app\n");
+ res = 0;
+ }
+
+ return res;
+}
+
+ari_validator ast_ari_validate_call_claimed_fn(void)
+{
+ return ast_ari_validate_call_claimed;
+}
+
int ast_ari_validate_channel_caller_id(struct ast_json *json)
{
int res = 1;
if (strcmp("BridgeVideoSourceChanged", discriminator) == 0) {
return ast_ari_validate_bridge_video_source_changed(json);
} else
+ if (strcmp("CallBroadcast", discriminator) == 0) {
+ return ast_ari_validate_call_broadcast(json);
+ } else
+ if (strcmp("CallClaimed", discriminator) == 0) {
+ return ast_ari_validate_call_claimed(json);
+ } else
if (strcmp("ChannelCallerId", discriminator) == 0) {
return ast_ari_validate_channel_caller_id(json);
} else
if (strcmp("BridgeVideoSourceChanged", discriminator) == 0) {
return ast_ari_validate_bridge_video_source_changed(json);
} else
+ if (strcmp("CallBroadcast", discriminator) == 0) {
+ return ast_ari_validate_call_broadcast(json);
+ } else
+ if (strcmp("CallClaimed", discriminator) == 0) {
+ return ast_ari_validate_call_claimed(json);
+ } else
if (strcmp("ChannelCallerId", discriminator) == 0) {
return ast_ari_validate_channel_caller_id(json);
} else
*/
ari_validator ast_ari_validate_bridge_video_source_changed_fn(void);
+/*!
+ * \brief Validator for CallBroadcast.
+ *
+ * Notification that a channel is being broadcast to ARI applications for claiming.
+ *
+ * \param json JSON object to validate.
+ * \retval True (non-zero) if valid.
+ * \retval False (zero) if invalid.
+ */
+int ast_ari_validate_call_broadcast(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_call_broadcast().
+ */
+ari_validator ast_ari_validate_call_broadcast_fn(void);
+
+/*!
+ * \brief Validator for CallClaimed.
+ *
+ * Notification that a broadcast channel has been successfully claimed by an ARI application.
+ *
+ * \param json JSON object to validate.
+ * \retval True (non-zero) if valid.
+ * \retval False (zero) if invalid.
+ */
+int ast_ari_validate_call_claimed(struct ast_json *json);
+
+/*!
+ * \brief Function pointer to ast_ari_validate_call_claimed().
+ */
+ari_validator ast_ari_validate_call_claimed_fn(void);
+
/*!
* \brief Validator for ChannelCallerId.
*
* - timestamp: Date (required)
* - bridge: Bridge (required)
* - old_video_source_id: string
+ * CallBroadcast
+ * - asterisk_id: string
+ * - type: string (required)
+ * - application: string (required)
+ * - timestamp: Date (required)
+ * - called: string
+ * - caller: string
+ * - channel: Channel (required)
+ * CallClaimed
+ * - asterisk_id: string
+ * - type: string (required)
+ * - application: string (required)
+ * - timestamp: Date (required)
+ * - channel: Channel (required)
+ * - winner_app: string (required)
* ChannelCallerId
* - asterisk_id: string
* - type: string (required)
#include "resource_events.h"
#include "internal.h"
#include "asterisk/stasis_app.h"
+#include "asterisk/stasis_app_broadcast.h"
void ast_ari_events_user_event(struct ast_variable *headers,
struct ast_ari_events_user_event_args *args,
"Error processing request");
}
}
+
+void ast_ari_events_claim_channel(struct ast_variable *headers,
+ struct ast_ari_events_claim_channel_args *args,
+ struct ast_ari_response *response)
+{
+ int res;
+
+ if (ast_strlen_zero(args->channel_id)) {
+ ast_ari_response_error(response, 400, "Bad Request",
+ "channelId parameter is required");
+ return;
+ }
+
+ if (ast_strlen_zero(args->application)) {
+ ast_ari_response_error(response, 400, "Bad Request",
+ "application parameter is required");
+ return;
+ }
+
+ res = stasis_app_claim_channel(args->channel_id, args->application);
+
+ switch (res) {
+ case 0:
+ /* Success */
+ ast_ari_response_no_content(response);
+ break;
+ case -1:
+ /* Channel not found */
+ ast_ari_response_error(response, 404, "Not Found",
+ "Channel not found or not in broadcast state");
+ break;
+ case -2:
+ /* Already claimed */
+ ast_ari_response_error(response, 409, "Conflict",
+ "Channel has already been claimed by another application");
+ break;
+ case AST_OPTIONAL_API_UNAVAILABLE:
+ /* Module not loaded */
+ ast_ari_response_error(response, 501, "Not Implemented",
+ "Broadcast functionality not available (res_stasis_broadcast not loaded)");
+ break;
+ default:
+ ast_ari_response_error(response, 500, "Internal Server Error",
+ "Failed to claim channel");
+ break;
+ }
+}
* \param[out] response HTTP response
*/
void ast_ari_events_user_event(struct ast_variable *headers, struct ast_ari_events_user_event_args *args, struct ast_ari_response *response);
+/*! Argument struct for ast_ari_events_claim_channel() */
+struct ast_ari_events_claim_channel_args {
+ /*! The ID of the channel to claim */
+ const char *channel_id;
+ /*! The name of the application claiming the channel */
+ const char *application;
+};
+/*!
+ * \brief Body parsing function for /events/claim.
+ * \param body The JSON body from which to parse parameters.
+ * \param[out] args The args structure to parse into.
+ * \retval zero on success
+ * \retval non-zero on failure
+ */
+int ast_ari_events_claim_channel_parse_body(
+ struct ast_json *body,
+ struct ast_ari_events_claim_channel_args *args);
+
+/*!
+ * \brief Claim a broadcast channel for this application.
+ *
+ * Atomically claims a channel that is in broadcast state. Only the first claim succeeds.
+ *
+ * \param headers HTTP headers
+ * \param args Swagger parameters
+ * \param[out] response HTTP response
+ */
+void ast_ari_events_claim_channel(struct ast_variable *headers, struct ast_ari_events_claim_channel_args *args, struct ast_ari_response *response);
#endif /* _ASTERISK_RESOURCE_EVENTS_H */
ast_free(args.source);
return;
}
+int ast_ari_events_claim_channel_parse_body(
+ struct ast_json *body,
+ struct ast_ari_events_claim_channel_args *args)
+{
+ struct ast_json *field;
+ /* Parse query parameters out of it */
+ field = ast_json_object_get(body, "channelId");
+ if (field) {
+ args->channel_id = ast_json_string_get(field);
+ }
+ field = ast_json_object_get(body, "application");
+ if (field) {
+ args->application = ast_json_string_get(field);
+ }
+ return 0;
+}
+
+/*!
+ * \brief Parameter parsing callback for /events/claim.
+ * \param ser TCP/TLS session object
+ * \param get_params GET parameters in the HTTP request.
+ * \param path_vars Path variables extracted from the request.
+ * \param headers HTTP headers.
+ * \param body
+ * \param[out] response Response to the HTTP request.
+ */
+static void ast_ari_events_claim_channel_cb(
+ struct ast_tcptls_session_instance *ser,
+ struct ast_variable *get_params, struct ast_variable *path_vars,
+ struct ast_variable *headers, struct ast_json *body, struct ast_ari_response *response)
+{
+ struct ast_ari_events_claim_channel_args args = {};
+ struct ast_variable *i;
+#if defined(AST_DEVMODE)
+ int is_valid;
+ int code;
+#endif /* AST_DEVMODE */
+
+ for (i = get_params; i; i = i->next) {
+ if (strcmp(i->name, "channelId") == 0) {
+ args.channel_id = (i->value);
+ } else
+ if (strcmp(i->name, "application") == 0) {
+ args.application = (i->value);
+ } else
+ {}
+ }
+ if (ast_ari_events_claim_channel_parse_body(body, &args)) {
+ ast_ari_response_alloc_failed(response);
+ goto fin;
+ }
+ ast_ari_events_claim_channel(headers, &args, response);
+#if defined(AST_DEVMODE)
+ code = response->response_code;
+
+ switch (code) {
+ case 0: /* Implementation is still a stub, or the code wasn't set */
+ is_valid = response->message == NULL;
+ break;
+ case 500: /* Internal Server Error */
+ case 501: /* Not Implemented */
+ case 404: /* Channel not found or not in broadcast state. */
+ case 409: /* Channel has already been claimed by another application. */
+ is_valid = 1;
+ break;
+ default:
+ if (200 <= code && code <= 299) {
+ is_valid = ast_ari_validate_void(
+ response->message);
+ } else {
+ ast_log(LOG_ERROR, "Invalid error response %d for /events/claim\n", code);
+ is_valid = 0;
+ }
+ }
+
+ if (!is_valid) {
+ ast_log(LOG_ERROR, "Response validation failed for /events/claim\n");
+ ast_ari_response_error(response, 500,
+ "Internal Server Error", "Response validation failed");
+ }
+#endif /* AST_DEVMODE */
+
+fin: __attribute__((unused))
+ return;
+}
/*! \brief REST handler for /api-docs/events.json */
static struct stasis_rest_handlers events_user_eventName = {
.children = { &events_user_eventName, }
};
/*! \brief REST handler for /api-docs/events.json */
+static struct stasis_rest_handlers events_claim = {
+ .path_segment = "claim",
+ .callbacks = {
+ [AST_HTTP_POST] = ast_ari_events_claim_channel_cb,
+ },
+ .num_children = 0,
+ .children = { }
+};
+/*! \brief REST handler for /api-docs/events.json */
static struct stasis_rest_handlers events = {
.path_segment = "events",
.callbacks = {
},
- .num_children = 1,
- .children = { &events_user, }
+ .num_children = 2,
+ .children = { &events_user,&events_claim, }
};
static int unload_module(void)
--- /dev/null
+/*
+ * Asterisk -- An open source telephony toolkit.
+ *
+ * Copyright (C) 2026, Aurora Innovation AB
+ *
+ * Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
+ *
+ * See http://www.asterisk.org for more information about
+ * the Asterisk project. Please do not directly contact
+ * any of the maintainers of this project for assistance;
+ * the project provides a web site, mailing lists and IRC
+ * channels for your use.
+ *
+ * This program is free software, distributed under the terms of
+ * the GNU General Public License Version 2. See the LICENSE file
+ * at the top of the source tree.
+ */
+
+/*! \file
+ *
+ * \brief Stasis application broadcast resource
+ *
+ * \author Daniel Donoghue <daniel.donoghue@aurorainnovation.com>
+ */
+
+/*** MODULEINFO
+ <depend type="module">res_stasis</depend>
+ <depend type="module">res_ari</depend>
+ <support_level>extended</support_level>
+ ***/
+
+#include "asterisk.h"
+
+#include <errno.h>
+#include <regex.h>
+
+#include "asterisk/astobj2.h"
+#include "asterisk/channel.h"
+#include "asterisk/http.h"
+#include "asterisk/json.h"
+#include "asterisk/lock.h"
+#include "asterisk/module.h"
+#include "asterisk/pbx.h"
+#include "asterisk/stasis_app.h"
+#include "asterisk/stasis_app_impl.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/time.h"
+#include "asterisk/utils.h"
+
+#define AST_API_MODULE /* Mark this as the module providing the API */
+#include "asterisk/stasis_app_broadcast.h"
+
+#define BROADCAST_BUCKETS 37
+
+/*! \brief Maximum length for app_filter regex pattern */
+#define MAX_REGEX_LENGTH 256
+
+/*! \brief Maximum depth for regex group nesting */
+#define MAX_GROUP_DEPTH 10
+
+/*! \brief Maximum number of nested quantifiers in regex */
+#define MAX_NESTED_QUANTIFIERS 3
+
+/*! \brief Maximum value for brace quantifier bounds {m,n} */
+#define MAX_QUANTIFIER_BOUND 100
+
+/*! \brief Maximum alternations allowed in deeply nested groups */
+#define MAX_ALTERNATIONS 20
+
+/*! \brief Group depth threshold for alternation limits */
+#define ALTERNATION_DEPTH_THRESHOLD 2
+
+/*! \brief Maximum broadcast timeout in milliseconds (24 hours) */
+#define MAX_BROADCAST_TIMEOUT_MS (24 * 60 * 60 * 1000)
+
+/*! \brief Interval in ms between hangup checks while waiting for a claim */
+#define BROADCAST_POLL_INTERVAL_MS 200
+
+/*! \brief Broadcast context stored on channel */
+struct stasis_broadcast_ctx {
+ /*! The unique ID of the channel */
+ char channel_id[AST_MAX_PUBLIC_UNIQUEID];
+ /*! Name of the winning application (dynamically allocated, NULL until claimed) */
+ char *winner_app;
+ /*! Regex pattern used to filter broadcast recipients */
+ char app_filter[MAX_REGEX_LENGTH + 1];
+ /*! Compiled regex for app_filter (valid only when filter_compiled is set) */
+ regex_t compiled_filter;
+ /*! Flag indicating if channel was claimed */
+ unsigned int claimed:1;
+ /*! Whether compiled_filter is valid and must be freed */
+ unsigned int filter_compiled:1;
+ /*! Set when the PBX thread retrieves the winner; prevents late claims */
+ unsigned int finished:1;
+ /*! Broadcast behaviour flags (STASIS_BROADCAST_FLAG_*) */
+ unsigned int flags;
+ /*! Reference to the global container (prevents use-after-free during module unload) */
+ struct ao2_container *container;
+ /*! Condition variable for claim notification */
+ ast_cond_t cond;
+};
+
+/*! \brief Container for all active broadcast contexts */
+static struct ao2_container *broadcast_contexts;
+
+/*! \brief Destructor for broadcast datastore
+ *
+ * Called when the channel is destroyed. Ensures the broadcast context
+ * is unlinked from the global container even if the caller never
+ * reached stasis_app_broadcast_cleanup (e.g. abnormal channel teardown).
+ */
+static void broadcast_datastore_destroy(void *data)
+{
+ struct stasis_broadcast_ctx *ctx = data;
+
+ if (ctx->container) {
+ ao2_unlink(ctx->container, ctx);
+ }
+ ao2_cleanup(ctx);
+}
+
+/*! \brief Datastore information for broadcast context */
+static const struct ast_datastore_info broadcast_datastore_info = {
+ .type = "stasis_broadcast_context",
+ .destroy = broadcast_datastore_destroy,
+};
+
+AO2_STRING_FIELD_HASH_FN(stasis_broadcast_ctx, channel_id)
+AO2_STRING_FIELD_CMP_FN(stasis_broadcast_ctx, channel_id)
+
+/*! \brief Destructor for broadcast context */
+static void broadcast_ctx_destructor(void *obj)
+{
+ struct stasis_broadcast_ctx *ctx = obj;
+ ast_free(ctx->winner_app);
+ if (ctx->filter_compiled) {
+ regfree(&ctx->compiled_filter);
+ }
+ ao2_cleanup(ctx->container);
+ ast_cond_destroy(&ctx->cond);
+}
+
+static int validate_regex_pattern(const char *pattern);
+
+/*! \brief Create a new broadcast context
+ *
+ * Validates and compiles the app_filter regex if provided. On regex
+ * failure the context is still created but broadcasts will be sent
+ * to all applications (i.e. no filtering).
+ */
+static struct stasis_broadcast_ctx *broadcast_ctx_create(
+ const char *channel_id, const char *app_filter, unsigned int flags)
+{
+ struct stasis_broadcast_ctx *ctx;
+
+ ctx = ao2_alloc(sizeof(*ctx), broadcast_ctx_destructor);
+ if (!ctx) {
+ return NULL;
+ }
+
+ /* ao2_alloc zeroes the struct; only set non-zero fields explicitly */
+ ast_copy_string(ctx->channel_id, channel_id, sizeof(ctx->channel_id));
+ ctx->flags = flags;
+ ctx->container = ao2_bump(broadcast_contexts);
+ ast_cond_init(&ctx->cond, NULL);
+
+ /* Validate and compile app_filter regex if provided */
+ if (!ast_strlen_zero(app_filter)) {
+ ast_copy_string(ctx->app_filter, app_filter, sizeof(ctx->app_filter));
+ if (validate_regex_pattern(app_filter) != 0) {
+ ast_log(LOG_WARNING,
+ "Channel %s: rejecting app_filter regex as potentially dangerous: %s\n",
+ channel_id, app_filter);
+ } else if (regcomp(&ctx->compiled_filter, app_filter,
+ REG_EXTENDED | REG_NOSUB) != 0) {
+ ast_log(LOG_WARNING,
+ "Channel %s: failed to compile app_filter regex '%s'\n",
+ channel_id, app_filter);
+ } else {
+ ctx->filter_compiled = 1;
+ }
+
+ if (!ctx->filter_compiled) {
+ ast_log(LOG_WARNING,
+ "Channel %s: proceeding without application filtering due to invalid regex\n",
+ channel_id);
+ }
+ }
+
+ ast_debug(1, "Created broadcast context for channel %s (filter: %s, flags: 0x%x)\n",
+ ctx->channel_id,
+ ctx->filter_compiled ? ctx->app_filter : "none",
+ ctx->flags);
+
+ return ctx;
+}
+
+/*!
+ * \brief Validate a regex pattern for safety
+ *
+ * Checks that the regex pattern is within length limits and doesn't contain
+ * patterns that could cause excessive backtracking or denial of service.
+ *
+ * \param pattern The regex pattern to validate
+ * \return 0 if valid, -1 if invalid
+ */
+static int validate_regex_pattern(const char *pattern)
+{
+ size_t len;
+ int group_depth = 0;
+ int quantified_groups = 0;
+ int in_class = 0; /* Inside [...] */
+ /* Track alternations per group depth. Index 0 is outside groups and unused. */
+ int alternations_per_depth[MAX_GROUP_DEPTH + 1] = { 0 };
+ const char *p;
+
+ if (ast_strlen_zero(pattern)) {
+ return 0; /* Empty pattern is valid (will be skipped) */
+ }
+
+ /* Check maximum length to prevent excessive regex compilation time */
+ len = strlen(pattern);
+ if (len > MAX_REGEX_LENGTH) {
+ ast_debug(3, "Regex pattern exceeds maximum length of %d characters (got %zu)\n",
+ MAX_REGEX_LENGTH, len);
+ return -1;
+ }
+
+ /*
+ * Check for potentially dangerous patterns that could cause
+ * excessive regex compilation or matching time. Look for:
+ * - Excessive group nesting depth
+ * - Too many quantified groups (groups followed by +, *, or ?)
+ *
+ * Note: This is a heuristic approach that catches common dangerous
+ * patterns. Combined with the length limit, it provides reasonable
+ * protection against ReDoS while allowing legitimate regex usage.
+ */
+ for (p = pattern; *p; p++) {
+ /* Handle character classes: enter on unescaped '[' and exit on unescaped ']' */
+ if (!in_class && *p == '[' && (p == pattern || *(p - 1) != '\\')) {
+ in_class = 1;
+ /* In POSIX ERE, ']' immediately after '[' or '[^' is a
+ * literal, not the end of the class. Advance past the
+ * optional negation caret and the literal ']' so the
+ * main loop does not leave in_class prematurely. */
+ if (*(p + 1) == '^') {
+ p++;
+ }
+ if (*(p + 1) == ']') {
+ p++;
+ }
+ continue;
+ } else if (in_class) {
+ if (*p == '\\') {
+ /* Skip the next escaped character inside character class */
+ if (*(p + 1)) {
+ p++;
+ }
+ continue;
+ }
+ if (*p == ']') {
+ in_class = 0;
+ }
+ /* Ignore everything inside character classes for heuristics */
+ continue;
+ }
+ switch (*p) {
+ case '(':
+ group_depth++;
+ if (group_depth > MAX_GROUP_DEPTH) {
+ ast_debug(3, "Regex pattern has too many nested groups (max %d)\n",
+ MAX_GROUP_DEPTH);
+ return -1;
+ }
+ /* Reset alternation counter for newly entered group depth */
+ alternations_per_depth[group_depth] = 0;
+ break;
+ case ')':
+ if (group_depth > 0) {
+ /* Clear alternations count for this depth before leaving */
+ alternations_per_depth[group_depth] = 0;
+ group_depth--;
+ }
+ break;
+ case '+':
+ case '*':
+ case '?':
+ /*
+ * Count quantified groups - patterns like (...)+ or (...)*
+ * Too many of these can cause slow matching on certain inputs.
+ */
+ if (p > pattern && *(p - 1) == ')') {
+ quantified_groups++;
+ }
+ break;
+ case '{': {
+ /* Parse POSIX quantifier {m}, {m,}, {m,n} with overflow and bound checks */
+ const char *q = p + 1;
+ long m = 0, n = -1; /* n=-1 means open upper bound */
+ int valid = 0;
+ int digit;
+ int overflow = 0;
+
+ if (*q >= '0' && *q <= '9') {
+ /* Parse m safely */
+ while (*q >= '0' && *q <= '9') {
+ digit = (*q - '0');
+ if (m > (LONG_MAX - digit) / 10) { /* overflow on next step */
+ overflow = 1;
+ break;
+ }
+ m = (m * 10) + digit;
+ if (m > MAX_QUANTIFIER_BOUND) { /* early bound exceed */
+ overflow = 1;
+ break;
+ }
+ q++;
+ }
+ if (!overflow && *q == ',') {
+ q++;
+ if (*q >= '0' && *q <= '9') {
+ long nn = 0;
+ while (*q >= '0' && *q <= '9') {
+ digit = (*q - '0');
+ if (nn > (LONG_MAX - digit) / 10) {
+ overflow = 1;
+ break;
+ }
+ nn = (nn * 10) + digit;
+ if (nn > MAX_QUANTIFIER_BOUND) {
+ overflow = 1;
+ break;
+ }
+ q++;
+ }
+ n = nn;
+ } else {
+ n = -1; /* open upper bound */
+ }
+ } else if (!overflow) {
+ n = m; /* {m} */
+ }
+ if (!overflow && *q == '}') {
+ valid = 1;
+ }
+ }
+ if (overflow) {
+ ast_debug(3, "Regex quantifier overflow or exceeds max bound (max %d)\n", MAX_QUANTIFIER_BOUND);
+ return -1;
+ }
+ if (valid) {
+ /* Additional bounds check (defensive) */
+ if (m > MAX_QUANTIFIER_BOUND || (n != -1 && n > MAX_QUANTIFIER_BOUND)) {
+ ast_debug(3, "Regex quantifier bounds too large (max %d)\n", MAX_QUANTIFIER_BOUND);
+ return -1;
+ }
+ if (p > pattern && *(p - 1) == ')') {
+ quantified_groups++;
+ }
+ p = q; /* q currently points to '}' */
+ }
+ break;
+ }
+ case '|':
+ if (group_depth > 0) {
+ alternations_per_depth[group_depth]++;
+ if (group_depth > ALTERNATION_DEPTH_THRESHOLD &&
+ alternations_per_depth[group_depth] > MAX_ALTERNATIONS) {
+ ast_debug(3,
+ "Regex has too many alternations in deep group (depth %d, count %d, max %d)\n",
+ group_depth,
+ alternations_per_depth[group_depth],
+ MAX_ALTERNATIONS);
+ return -1;
+ }
+ }
+ break;
+ case '\\':
+ /*
+ * Skip the next character entirely from heuristic processing.
+ * This ensures escaped characters (metacharacters in BRE or literals
+ * in ERE like \(, \), \+, \*, \?, etc.) do not affect group depth
+ * or quantified group counts.
+ */
+ if (*(p + 1)) {
+ p++;
+ }
+ /* Continue to next loop iteration without evaluating the escaped char */
+ continue;
+ }
+ }
+
+ /*
+ * Reject patterns with too many quantified groups, as these are
+ * often indicators of potentially slow patterns that could be
+ * exploited for denial of service.
+ */
+ if (quantified_groups > MAX_NESTED_QUANTIFIERS) {
+ ast_debug(3, "Regex pattern has too many quantified groups (max %d)\n",
+ MAX_NESTED_QUANTIFIERS);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*! \brief Create and send broadcast event to all applications
+ *
+ * Uses the compiled regex cached in \a ctx for application filtering.
+ */
+static int send_broadcast_event(struct ast_channel *chan,
+ struct stasis_broadcast_ctx *ctx)
+{
+ RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
+ struct ao2_iterator iter;
+ char *app_name;
+ const char *caller = NULL;
+ const char *called = NULL;
+
+ /* Get snapshot and caller/called info under a single channel lock */
+ ast_channel_lock(chan);
+ snapshot = ao2_bump(ast_channel_snapshot(chan));
+ caller = ast_strdupa(S_OR(ast_channel_caller(chan)->id.number.str, ""));
+ called = ast_strdupa(S_OR(ast_channel_exten(chan), ""));
+ ast_channel_unlock(chan);
+
+ /* Build the broadcast event. Channel variables configured in
+ * ari.conf "channelvars" are already included in the channel
+ * snapshot produced by ast_channel_snapshot_to_json(). */
+ event = ast_json_pack("{s: s, s: o, s: o, s: s?, s: s?}",
+ "type", "CallBroadcast",
+ "timestamp", ast_json_timeval(ast_tvnow(), NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot, NULL),
+ "caller", caller,
+ "called", called);
+
+ if (!event) {
+ ast_log(LOG_ERROR, "Channel %s: failed to create broadcast event\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ /* Get all registered applications */
+ apps = stasis_app_get_all();
+ if (!apps) {
+ ast_log(LOG_ERROR, "Channel %s: failed to get stasis applications\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ ast_debug(2, "Broadcasting to %d registered Stasis applications\n",
+ ao2_container_count(apps));
+
+ /*
+ * Broadcast to all matching applications.
+ *
+ * We collect matching apps into a plain array, Fisher-Yates shuffle it,
+ * then call stasis_app_send() for each. stasis_app_send() writes
+ * directly to each app's WebSocket socket synchronously on the calling
+ * thread. The shuffle ensures no single ARI application is consistently
+ * first to receive the event — every app gets a fair chance to claim the
+ * channel regardless of its position in the ao2 hash container.
+ */
+ {
+ int app_count;
+ char **matching_arr;
+ int n = 0;
+ int i;
+
+ app_count = ao2_container_count(apps);
+ if (app_count == 0) {
+ ast_debug(2, "Channel %s: no Stasis applications registered\n",
+ ast_channel_uniqueid(chan));
+ return 0;
+ }
+
+ matching_arr = ast_malloc(app_count * sizeof(*matching_arr));
+ if (!matching_arr) {
+ ast_log(LOG_ERROR, "Channel %s: failed to allocate matching apps array\n",
+ ast_channel_name(chan));
+ return -1;
+ }
+
+ /* First pass: collect all matching app names (transfer refs to array) */
+ iter = ao2_iterator_init(apps, 0);
+ while ((app_name = ao2_iterator_next(&iter)) && n < app_count) {
+ if (ctx->filter_compiled &&
+ regexec(&ctx->compiled_filter, app_name, 0, NULL, 0) == REG_NOMATCH) {
+ ast_debug(3, "App '%s' does not match filter, skipping\n", app_name);
+ ao2_ref(app_name, -1);
+ continue;
+ }
+ matching_arr[n++] = app_name; /* ref transferred to array */
+ }
+ ao2_iterator_destroy(&iter);
+
+ ast_debug(2, "Broadcasting channel %s to %d matching applications\n",
+ ast_channel_uniqueid(chan), n);
+
+ /* Fisher-Yates shuffle: randomise delivery order so no app is
+ * consistently first to receive the broadcast event. */
+ for (i = n - 1; i > 0; i--) {
+ int j = ast_random() % (i + 1);
+ char *tmp = matching_arr[i];
+ matching_arr[i] = matching_arr[j];
+ matching_arr[j] = tmp;
+ }
+
+ /*
+ * Second pass: send to each matching app. A deep copy of the event
+ * is required for each call because stasis_app_send() mutates the
+ * message in-place (adds "asterisk_id" via ast_json_object_set).
+ */
+ for (i = 0; i < n; i++) {
+ char *match_name = matching_arr[i];
+ struct ast_json *event_copy;
+
+ ast_debug(3, "Sending broadcast to app '%s'\n", match_name);
+
+ event_copy = ast_json_deep_copy(event);
+ if (!event_copy) {
+ ast_log(LOG_ERROR,
+ "Channel %s: failed to deep-copy event for app '%s'\n",
+ ast_channel_uniqueid(chan), match_name);
+ ao2_ref(match_name, -1);
+ continue;
+ }
+
+ stasis_app_send(match_name, event_copy);
+ ast_json_unref(event_copy);
+ ao2_ref(match_name, -1);
+ }
+
+ ast_free(matching_arr);
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Start a broadcast for a channel
+ * \param chan The channel to broadcast
+ * \param timeout_ms Timeout in milliseconds
+ * \param app_filter Optional regex filter for applications
+ * \return 0 on success, -1 on error
+ */
+int AST_OPTIONAL_API_NAME(stasis_app_broadcast_channel)(struct ast_channel *chan, int timeout_ms,
+ const char *app_filter, unsigned int flags)
+{
+ RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup);
+ struct ast_datastore *datastore;
+
+ if (!chan) {
+ return -1;
+ }
+
+ if (!broadcast_contexts) {
+ return -1;
+ }
+
+ /* Remove any previous broadcast datastore from a prior attempt.
+ * This supports failover scenarios where StasisBroadcast() is
+ * called multiple times for the same channel. The datastore
+ * destructor unlinks the old context from the container. */
+ {
+ struct ast_datastore *old_ds;
+ ast_channel_lock(chan);
+ old_ds = ast_channel_datastore_find(chan, &broadcast_datastore_info, NULL);
+ if (old_ds) {
+ ast_channel_datastore_remove(chan, old_ds);
+ }
+ ast_channel_unlock(chan);
+ if (old_ds) {
+ ast_datastore_free(old_ds);
+ }
+ }
+
+ /* Create broadcast context (validates and compiles app_filter regex) */
+ ctx = broadcast_ctx_create(ast_channel_uniqueid(chan), app_filter, flags);
+ if (!ctx) {
+ ast_log(LOG_ERROR, "Channel %s: failed to create broadcast context\n",
+ ast_channel_uniqueid(chan));
+ return -1;
+ }
+
+ /* Store context in container */
+ ao2_link(broadcast_contexts, ctx);
+
+ /* Create and attach datastore to channel */
+ datastore = ast_datastore_alloc(&broadcast_datastore_info, ast_channel_uniqueid(chan));
+ if (!datastore) {
+ ast_log(LOG_ERROR, "Channel %s: failed to allocate broadcast datastore\n",
+ ast_channel_uniqueid(chan));
+ ao2_unlink(broadcast_contexts, ctx);
+ return -1;
+ }
+
+ datastore->data = ao2_bump(ctx);
+ ast_channel_lock(chan);
+ if (ast_channel_datastore_add(chan, datastore)) {
+ ast_channel_unlock(chan);
+ ast_log(LOG_ERROR, "Channel %s: failed to attach broadcast datastore\n",
+ ast_channel_uniqueid(chan));
+ ast_datastore_free(datastore);
+ ao2_unlink(broadcast_contexts, ctx);
+ return -1;
+ }
+ ast_channel_unlock(chan);
+
+ ast_debug(1, "Starting broadcast for channel %s (timeout: %dms, filter: %s)\n",
+ ast_channel_uniqueid(chan), timeout_ms, app_filter ? app_filter : "none");
+
+ /* Send broadcast event to all matching applications */
+ if (send_broadcast_event(chan, ctx) != 0) {
+ ast_log(LOG_ERROR, "Channel %s: failed to send broadcast event\n",
+ ast_channel_uniqueid(chan));
+ ast_channel_lock(chan);
+ ast_channel_datastore_remove(chan, datastore);
+ ast_channel_unlock(chan);
+ ast_datastore_free(datastore);
+ ao2_unlink(broadcast_contexts, ctx);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Attempt to claim a broadcast channel
+ * \param channel_id The unique ID of the channel
+ * \param app_name The name of the application claiming the channel
+ * \return 0 if claim successful, -1 if channel not found, -2 if already claimed
+ */
+int AST_OPTIONAL_API_NAME(stasis_app_claim_channel)(const char *channel_id, const char *app_name)
+{
+ RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id) || ast_strlen_zero(app_name)) {
+ return -1;
+ }
+
+ if (!broadcast_contexts) {
+ return -1;
+ }
+
+ /* Find broadcast context */
+ ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
+ if (!ctx) {
+ ast_debug(1, "No broadcast context found for channel %s\n", channel_id);
+ return -1;
+ }
+
+ /* Atomically check and set claimed flag.
+ * Check claimed before finished: if the channel was claimed and then the
+ * broadcast finished, a late claim should return -2 (409 Conflict) rather
+ * than -1 (404) so callers can distinguish "already taken" from "not found". */
+ ao2_lock(ctx);
+ if (ctx->claimed) {
+ ast_debug(1, "Channel %s already claimed by %s (attempt by %s denied)\n",
+ channel_id, ctx->winner_app ? ctx->winner_app : "(unknown)", app_name);
+ ao2_unlock(ctx);
+ return -2;
+ }
+ if (ctx->finished) {
+ ast_debug(1, "Channel %s broadcast already finished (late claim by %s rejected)\n",
+ channel_id, app_name);
+ ao2_unlock(ctx);
+ return -1;
+ }
+ ctx->winner_app = ast_strdup(app_name);
+ if (!ctx->winner_app) {
+ ast_log(LOG_ERROR,
+ "Failed to allocate winner app name for channel %s\n",
+ channel_id);
+ ao2_unlock(ctx);
+ return -1;
+ }
+ ctx->claimed = 1;
+ ast_verb(3, "Channel %s claimed by application %s\n",
+ channel_id, app_name);
+ /* Signal waiting thread that channel was claimed */
+ ast_cond_signal(&ctx->cond);
+ ao2_unlock(ctx);
+
+ /* Send CallClaimed event to matching apps */
+ if (!(ctx->flags & STASIS_BROADCAST_FLAG_SUPPRESS_CLAIMED)) {
+ RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+ RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
+ RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
+ struct ao2_iterator iter;
+ char *app_name_iter;
+
+ snapshot = ast_channel_snapshot_get_latest(channel_id);
+ if (snapshot) {
+ event = ast_json_pack("{s: s, s: o, s: o, s: s}",
+ "type", "CallClaimed",
+ "timestamp", ast_json_timeval(ast_tvnow(), NULL),
+ "channel", ast_channel_snapshot_to_json(snapshot, NULL),
+ "winner_app", app_name);
+ }
+ if (event) {
+ apps = stasis_app_get_all();
+ }
+ if (apps) {
+ iter = ao2_iterator_init(apps, 0);
+ while ((app_name_iter = ao2_iterator_next(&iter))) {
+ struct ast_json *event_copy;
+
+ /* Only send to apps that matched the original broadcast filter */
+ if (ctx->filter_compiled &&
+ regexec(&ctx->compiled_filter, app_name_iter,
+ 0, NULL, 0) == REG_NOMATCH) {
+ ao2_ref(app_name_iter, -1);
+ continue;
+ }
+
+ event_copy = ast_json_deep_copy(event);
+ if (!event_copy) {
+ ao2_ref(app_name_iter, -1);
+ continue;
+ }
+
+ stasis_app_send(app_name_iter, event_copy);
+ ast_json_unref(event_copy);
+ ao2_ref(app_name_iter, -1);
+ }
+ ao2_iterator_destroy(&iter);
+ }
+ }
+
+ return 0;
+}
+
+/*!
+ * \brief Get the winner app name for a broadcast channel
+ * \param channel_id The unique ID of the channel
+ * \return A copy of the winner app name (caller must free with ast_free),
+ * or NULL if not claimed or not found
+ */
+char *AST_OPTIONAL_API_NAME(stasis_app_broadcast_winner)(const char *channel_id)
+{
+ RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup);
+ char *winner = NULL;
+
+ if (ast_strlen_zero(channel_id)) {
+ return NULL;
+ }
+
+ ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
+ if (!ctx) {
+ return NULL;
+ }
+
+ ao2_lock(ctx);
+ if (ctx->claimed) {
+ winner = ast_strdup(ctx->winner_app);
+ }
+ /* Mark the broadcast as finished so no new claims can succeed.
+ * This closes the race window between reading the winner and
+ * the subsequent broadcast_cleanup call. */
+ ctx->finished = 1;
+ ao2_unlock(ctx);
+
+ return winner;
+}
+
+/*!
+ * \brief Wait for a broadcast channel to be claimed
+ *
+ * Blocks until the channel is claimed, the timeout expires, or the
+ * channel hangs up. The hangup check runs every
+ * #BROADCAST_POLL_INTERVAL_MS so that a dead channel does not tie up
+ * a PBX thread for the full timeout period.
+ *
+ * \param chan The channel
+ * \param timeout_ms Maximum time to wait in milliseconds
+ * \return 0 if claimed within timeout, -1 otherwise
+ */
+int AST_OPTIONAL_API_NAME(stasis_app_broadcast_wait)(struct ast_channel *chan, int timeout_ms)
+{
+ RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup);
+ const char *channel_id;
+ struct timeval deadline;
+ int result = -1;
+
+ if (!chan) {
+ return -1;
+ }
+
+ channel_id = ast_channel_uniqueid(chan);
+
+ if (!broadcast_contexts) {
+ return -1;
+ }
+
+ ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY);
+ if (!ctx) {
+ ast_log(LOG_WARNING, "No broadcast context for channel %s\n", channel_id);
+ return -1;
+ }
+
+ /* Cap excessive timeouts to prevent arithmetic overflow */
+ if (timeout_ms < 0) {
+ timeout_ms = 0;
+ } else if (timeout_ms > MAX_BROADCAST_TIMEOUT_MS) {
+ timeout_ms = MAX_BROADCAST_TIMEOUT_MS;
+ }
+
+ /* Calculate absolute deadline */
+ deadline = ast_tvadd(ast_tvnow(),
+ ast_tv(timeout_ms / 1000, (timeout_ms % 1000) * 1000));
+
+ ao2_lock(ctx);
+ while (!ctx->claimed) {
+ struct timeval now;
+ struct timespec poll_spec;
+ long remaining_ms;
+ long poll_ms;
+ int wait_result;
+
+ /* Check for hangup so we don't block on a dead channel */
+ if (ast_check_hangup(chan)) {
+ ast_debug(3, "Channel %s hung up during broadcast wait\n",
+ channel_id);
+ break;
+ }
+
+ /* Check if we've passed the overall deadline */
+ now = ast_tvnow();
+ remaining_ms = ast_tvdiff_ms(deadline, now);
+ if (remaining_ms <= 0) {
+ ast_debug(3, "Broadcast timeout for channel %s after %dms\n",
+ channel_id, timeout_ms);
+ break;
+ }
+
+ /* Sleep for the shorter of the remaining time and the poll interval */
+ poll_ms = remaining_ms;
+ if (poll_ms > BROADCAST_POLL_INTERVAL_MS) {
+ poll_ms = BROADCAST_POLL_INTERVAL_MS;
+ }
+
+ poll_spec.tv_sec = now.tv_sec + (poll_ms / 1000);
+ poll_spec.tv_nsec = (long)(now.tv_usec) * 1000L
+ + (long)(poll_ms % 1000) * 1000000L;
+ while (poll_spec.tv_nsec >= 1000000000) {
+ poll_spec.tv_sec++;
+ poll_spec.tv_nsec -= 1000000000;
+ }
+
+ wait_result = ast_cond_timedwait(&ctx->cond, ao2_object_get_lockaddr(ctx), &poll_spec);
+ if (wait_result != 0 && wait_result != ETIMEDOUT) {
+ ast_log(LOG_WARNING,
+ "Channel %s: unexpected error waiting for claim: %s (%d)\n",
+ channel_id, strerror(wait_result), wait_result);
+ break;
+ }
+ /* Loop back: re-check claimed, then hangup, then deadline */
+ }
+
+ if (ctx->claimed) {
+ ast_debug(1, "Channel %s claimed by %s\n",
+ channel_id, ctx->winner_app);
+ result = 0;
+ }
+ ao2_unlock(ctx);
+
+ return result;
+}
+
+/*!
+ * \brief Clean up broadcast context for a channel
+ *
+ * This is the normal-path cleanup called by the dialplan application
+ * after the broadcast completes. The channel datastore destructor
+ * (broadcast_datastore_destroy) also unlinks the context as a safety
+ * net for abnormal teardown; ao2_unlink is idempotent so the double
+ * call is harmless.
+ *
+ * \param channel_id The unique ID of the channel
+ */
+void AST_OPTIONAL_API_NAME(stasis_app_broadcast_cleanup)(const char *channel_id)
+{
+ RAII_VAR(struct stasis_broadcast_ctx *, ctx, NULL, ao2_cleanup);
+
+ if (ast_strlen_zero(channel_id) || !broadcast_contexts) {
+ return;
+ }
+
+ ctx = ao2_find(broadcast_contexts, channel_id, OBJ_SEARCH_KEY | OBJ_UNLINK);
+ if (ctx) {
+ ast_debug(3, "Cleaning up broadcast context for %s\n", channel_id);
+ }
+}
+
+static int load_module(void)
+{
+ broadcast_contexts = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
+ BROADCAST_BUCKETS, stasis_broadcast_ctx_hash_fn, NULL, stasis_broadcast_ctx_cmp_fn);
+
+ if (!broadcast_contexts) {
+ return AST_MODULE_LOAD_DECLINE;
+ }
+
+ ast_debug(1, "Stasis broadcast module loaded\n");
+ return AST_MODULE_LOAD_SUCCESS;
+}
+
+static int unload_module(void)
+{
+ /* NULL the global pointer before releasing the reference so that
+ * concurrent lookups see NULL (safe) rather than a freed pointer. */
+ {
+ struct ao2_container *old_contexts = broadcast_contexts;
+ broadcast_contexts = NULL;
+ ao2_cleanup(old_contexts);
+ }
+
+ ast_debug(1, "Stasis broadcast module unloaded\n");
+ return 0;
+}
+
+AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER,
+ "Stasis application broadcast",
+ .support_level = AST_MODULE_SUPPORT_EXTENDED,
+ .load = load_module,
+ .unload = unload_module,
+ .requires = "res_stasis,res_ari,http",
+ .load_pri = AST_MODPRI_APP_DEPEND - 1,
+);
]
}
]
+ },
+ {
+ "path": "/events/claim",
+ "description": "Broadcast channel claim operations",
+ "operations": [
+ {
+ "httpMethod": "POST",
+ "since": [
+ "20.17.0",
+ "22.7.0",
+ "23.1.0"
+ ],
+ "summary": "Claim a broadcast channel for this application.",
+ "notes": "Atomically claims a channel that is in broadcast state. Only the first claim succeeds.",
+ "nickname": "claimChannel",
+ "responseClass": "void",
+ "parameters": [
+ {
+ "name": "channelId",
+ "description": "The ID of the channel to claim",
+ "paramType": "query",
+ "required": true,
+ "allowMultiple": false,
+ "dataType": "string"
+ },
+ {
+ "name": "application",
+ "description": "The name of the application claiming the channel",
+ "paramType": "query",
+ "required": true,
+ "allowMultiple": false,
+ "dataType": "string"
+ }
+ ],
+ "errorResponses": [
+ {
+ "code": 404,
+ "reason": "Channel not found or not in broadcast state."
+ },
+ {
+ "code": 409,
+ "reason": "Channel has already been claimed by another application."
+ }
+ ]
+ }
+ ]
}
],
"models": {
"ChannelConnectedLine",
"PeerStatusChange",
"ChannelTransfer",
- "RESTResponse"
+ "RESTResponse",
+ "CallBroadcast",
+ "CallClaimed"
]
},
"ContactInfo": {
"description": "Response message body"
}
}
+ },
+ "CallBroadcast": {
+ "id": "CallBroadcast",
+ "description": "Notification that a channel is being broadcast to ARI applications for claiming.",
+ "properties": {
+ "channel": {
+ "required": true,
+ "type": "Channel",
+ "description": "The channel being broadcast."
+ },
+ "caller": {
+ "required": false,
+ "type": "string",
+ "description": "The caller ID number."
+ },
+ "called": {
+ "required": false,
+ "type": "string",
+ "description": "The called number."
+ }
+ }
+ },
+ "CallClaimed": {
+ "id": "CallClaimed",
+ "description": "Notification that a broadcast channel has been successfully claimed by an ARI application.",
+ "properties": {
+ "channel": {
+ "required": true,
+ "type": "Channel",
+ "description": "The channel that was claimed."
+ },
+ "winner_app": {
+ "required": true,
+ "type": "string",
+ "description": "The name of the ARI application that claimed the channel."
+ }
+ }
}
}
}