return SWITCH_STATUS_GENERR;
}
+static void mod_amqp_command_response(mod_amqp_command_profile_t *profile, char *command, switch_stream_handle_t stream,
+ char *fs_resp_exchange, char *fs_resp_key, switch_status_t status)
+{
+ char *json_output = NULL;
+ amqp_basic_properties_t props;
+ cJSON *message = NULL;
+
+ if (! profile->conn_active) {
+ /* No connection, so we can not send the message. */
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] not active\n", profile->name);
+ return;
+ }
+
+ /* Construct the api response */
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Preparing api command response: [%s]\n", (char *)stream.data);
+ message = cJSON_CreateObject();
+
+ cJSON_AddItemToObject(message, "output", cJSON_CreateString((const char *) stream.data));
+ cJSON_AddItemToObject(message, "command", cJSON_CreateString(command));
+ cJSON_AddItemToObject(message, "status", cJSON_CreateNumber((double) status));
+
+ json_output = cJSON_Print(message);
+ cJSON_Delete(message);
+
+ memset(&props, 0, sizeof(amqp_basic_properties_t));
+
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG;
+ props.content_type = amqp_cstring_bytes("text/json");
+
+ status = amqp_basic_publish(
+ profile->conn_active->state,
+ 1,
+ amqp_cstring_bytes(fs_resp_exchange),
+ amqp_cstring_bytes(fs_resp_key),
+ 0,
+ 0,
+ &props,
+ amqp_cstring_bytes(json_output));
+
+ switch_safe_free(json_output);
+
+ if (status < 0) {
+ const char *errstr = amqp_error_string2(-status);
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_CRIT, "Profile[%s] failed to send event on connection[%s]: %s\n",
+ profile->name, profile->conn_active->name, errstr);
+
+ /* This is bad, we couldn't send the message. Clear up any connection */
+ mod_amqp_connection_close(profile->conn_active);
+ profile->conn_active = NULL;
+ return;
+ }
+
+ return;
+}
void * SWITCH_THREAD_FUNC mod_amqp_command_thread(switch_thread_t *thread, void *data)
{
COMMAND_FORMAT_UNKNOWN,
COMMAND_FORMAT_PLAINTEXT
} commandFormat = COMMAND_FORMAT_PLAINTEXT;
+ char *fs_resp_exchange = NULL, *fs_resp_key = NULL;
amqp_maybe_release_buffers(profile->conn_active->state);
}
}
+ if (envelope.message.properties.headers.num_entries) {
+ int x = 0;
+
+ for ( x = 0; x < envelope.message.properties.headers.num_entries; x++) {
+ char *header_key = (char *)envelope.message.properties.headers.entries[x].key.bytes;
+ char *header_value = (char *)envelope.message.properties.headers.entries[x].value.value.bytes.bytes;
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "AMQP message custom header key[%s] value[%s]\n", header_key, header_value);
+
+ if ( !strncmp(header_key, "x-fs-api-resp-exchange", 22)) {
+ fs_resp_exchange = header_value;
+ } else if (!strncmp(header_key, "x-fs-api-resp-key", 17)) {
+ fs_resp_key = header_value;
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Ignoring unrecognized event header [%s]\n", header_key);
+ }
+ }
+ }
+
if (commandFormat == COMMAND_FORMAT_PLAINTEXT) {
switch_stream_handle_t stream = { 0 }; /* Collects the command output */
SWITCH_STANDARD_STREAM(stream);
- if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data);
+ if ( fs_resp_exchange && fs_resp_key ) {
+ switch_status_t status = switch_console_execute(command, 0, &stream);
+ mod_amqp_command_response(profile, command, stream, fs_resp_exchange, fs_resp_key, status);
} else {
- switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data);
+ if (switch_console_execute(command, 0, &stream) != SWITCH_STATUS_SUCCESS) {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "Remote command failed:\n%s\n", (char *) stream.data);
+ } else {
+ switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_DEBUG, "Remote command succeeded:\n%s\n", (char *) stream.data);
+ }
}
switch_safe_free(stream.data);
}