.on_rx_response = distributor,
};
+/*! Number of serializers in pool if one not otherwise known. (Best if prime number) */
+#define DISTRIBUTOR_POOL_SIZE 31
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *distributor_pool[DISTRIBUTOR_POOL_SIZE];
+
/*!
* \internal
* \brief Record the task's serializer name on the tdata structure.
return dlg;
}
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to add to the hash
+ * \param[in] hash The hash value to add to
+ *
+ * \details
+ * This version of the function is for when you need to compute a
+ * string hash of more than one string.
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * \sa http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash_add(pj_str_t *str, int hash)
+{
+ size_t len;
+ const char *pos;
+
+ len = pj_strlen(str);
+ pos = pj_strbuf(str);
+ while (len--) {
+ hash = hash * 33 ^ *pos++;
+ }
+
+ return hash;
+}
+
+/*!
+ * \internal
+ * \brief Compute a hash value on a pjlib string
+ * \since 13.10.0
+ *
+ * \param[in] str The pjlib string to hash
+ *
+ * This famous hash algorithm was written by Dan Bernstein and is
+ * commonly used.
+ *
+ * http://www.cse.yorku.ca/~oz/hash.html
+ */
+static int pjstr_hash(pj_str_t *str)
+{
+ return pjstr_hash_add(str, 5381);
+}
+
+struct ast_taskprocessor *ast_sip_get_distributor_serializer(pjsip_rx_data *rdata)
+{
+ int hash;
+ pj_str_t *remote_tag;
+ struct ast_taskprocessor *serializer;
+
+ if (!rdata->msg_info.msg) {
+ return NULL;
+ }
+
+ if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
+ remote_tag = &rdata->msg_info.from->tag;
+ } else {
+ remote_tag = &rdata->msg_info.to->tag;
+ }
+
+ /* Compute the hash from the SIP message call-id and remote-tag */
+ hash = pjstr_hash(&rdata->msg_info.cid->id);
+ hash = pjstr_hash_add(remote_tag, hash);
+ hash = abs(hash);
+
+ serializer = ao2_bump(distributor_pool[hash % ARRAY_LEN(distributor_pool)]);
+ if (serializer) {
+ ast_debug(3, "Calculated serializer %s to use for %s\n",
+ ast_taskprocessor_name(serializer), pjsip_rx_data_get_info(rdata));
+ }
+ return serializer;
+}
+
static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
static pjsip_module endpoint_mod = {
ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
rdata->msg_info.info);
serializer = find_request_serializer(rdata);
+ if (!serializer) {
+ /*
+ * Pick a serializer for the unmatched response. Maybe
+ * the stack can figure out what it is for, or we really
+ * should just toss it regardless.
+ */
+ serializer = ast_sip_get_distributor_serializer(rdata);
+ }
} else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
|| !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
/* We have a BYE or CANCEL request without a serializer. */
pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
return PJ_TRUE;
+ } else {
+ /* Pick a serializer for the out-of-dialog request. */
+ serializer = ast_sip_get_distributor_serializer(rdata);
}
pjsip_rx_data_clone(rdata, 0, &clone);
ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
pjsip_rx_data_free_cloned(clone);
} else {
- ast_sip_push_task(serializer, distribute, clone);
+ if (ast_sip_push_task(serializer, distribute, clone)) {
+ ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
+ pjsip_rx_data_free_cloned(clone);
+ }
}
ast_taskprocessor_unreference(serializer);
return endpoint;
}
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \return Nothing
+ */
+static void distributor_pool_shutdown(void)
+{
+ int idx;
+
+ for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+ ast_taskprocessor_unreference(distributor_pool[idx]);
+ distributor_pool[idx] = NULL;
+ }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the distributor pool.
+ * \since 13.10.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int distributor_pool_setup(void)
+{
+ char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+ int idx;
+
+ for (idx = 0; idx < ARRAY_LEN(distributor_pool); ++idx) {
+ /* Create name with seq number appended. */
+ ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/distributor");
+
+ distributor_pool[idx] = ast_sip_create_serializer_named(tps_name);
+ if (!distributor_pool[idx]) {
+ return -1;
+ }
+ }
+ return 0;
+}
+
int ast_sip_initialize_distributor(void)
{
+ if (distributor_pool_setup()) {
+ ast_sip_destroy_distributor();
+ return -1;
+ }
+
if (create_artificial_endpoint() || create_artificial_auth()) {
+ ast_sip_destroy_distributor();
return -1;
}
if (internal_sip_register_service(&distributor_mod)) {
+ ast_sip_destroy_distributor();
return -1;
}
if (internal_sip_register_service(&endpoint_mod)) {
+ ast_sip_destroy_distributor();
return -1;
}
if (internal_sip_register_service(&auth_mod)) {
+ ast_sip_destroy_distributor();
return -1;
}
void ast_sip_destroy_distributor(void)
{
- internal_sip_unregister_service(&distributor_mod);
- internal_sip_unregister_service(&endpoint_mod);
internal_sip_unregister_service(&auth_mod);
+ internal_sip_unregister_service(&endpoint_mod);
+ internal_sip_unregister_service(&distributor_mod);
ao2_cleanup(artificial_auth);
ao2_cleanup(artificial_endpoint);
+
+ distributor_pool_shutdown();
}