]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-11721 add a send queue to buffer msg messages before msrp socket is ready
authorSeven Du <dujinfang@gmail.com>
Fri, 12 Apr 2019 04:04:04 +0000 (12:04 +0800)
committerAndrey Volk <andywolk@gmail.com>
Wed, 17 Jul 2019 19:25:25 +0000 (23:25 +0400)
src/include/switch_msrp.h
src/switch_msrp.c

index c86a1668773d3bc8791e7c85d6513f19bf4035d9..c1e21925374ac46e0e2ce27870b2d0fd269f823d 100644 (file)
@@ -118,6 +118,7 @@ struct switch_msrp_session_s{
        uint8_t frame_data[SWITCH_RTP_MAX_BUF_LEN];
        int running;
        void *user_data;
+       switch_queue_t *send_queue;
 };
 
 SWITCH_DECLARE(switch_status_t) switch_msrp_init(void);
index 29b3bbd6fdcf3716e9e413fa902597f2c93e37f9..1692a554db5db88f5b0bd66ed80c17b92b211229 100644 (file)
@@ -331,7 +331,9 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_destroy()
 {
        switch_status_t st = SWITCH_STATUS_SUCCESS;
        switch_socket_t *sock;
+
        globals.running = 0;
+
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_NOTICE, "destroying thread\n");
 
        sock = globals.msock.sock;
@@ -387,6 +389,14 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_session_destroy(switch_msrp_session_
                switch_yield(20000);
        }
 
+       if ((*ms)->send_queue) {
+               switch_msrp_msg_t *msg = NULL;
+
+               while (switch_queue_trypop((*ms)->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) {
+                       switch_msrp_msg_destroy(&msg);
+               }
+       }
+
        switch_mutex_destroy((*ms)->mutex);
        ms = NULL;
        return SWITCH_STATUS_SUCCESS;
@@ -1467,7 +1477,7 @@ void random_string(char *buf, uint16_t size)
 }
 
 #define MSRP_TRANS_ID_LEN 16
-SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
+static switch_status_t switch_msrp_do_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
 {
        char transaction_id[MSRP_TRANS_ID_LEN + 1] = { 0 };
        char buf[MSRP_BUFF_SIZE];
@@ -1482,11 +1492,6 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *
                return SWITCH_STATUS_SUCCESS;
        }
 
-       if (!ms->running) {
-               switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Discard one message\n");
-               return SWITCH_STATUS_SUCCESS;
-       }
-
        if (!from_path) {
                switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "NO FROM PATH\n");
                return SWITCH_STATUS_SUCCESS;
@@ -1524,6 +1529,47 @@ SWITCH_DECLARE(switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *
        return ms->csock ? msrp_socket_send(ms->csock, buf, &len) : SWITCH_STATUS_FALSE;
 }
 
+SWITCH_DECLARE (switch_status_t) switch_msrp_perform_send(switch_msrp_session_t *ms, switch_msrp_msg_t *msrp_msg, const char *file, const char *func, int line)
+{
+       switch_msrp_msg_t *msg = NULL;
+       switch_status_t status = SWITCH_STATUS_SUCCESS;
+
+       if (!ms->running) {
+               if (!ms->send_queue) {
+                       switch_queue_create(&ms->send_queue, 100, ms->pool);
+               }
+
+               switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Buffering one message %" SWITCH_SIZE_T_FMT " bytes\n", msrp_msg->payload_bytes);
+
+               if (globals.debug && msrp_msg->payload) {
+                       switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_WARNING, "MSRP not ready! Buffered one message [%s]\n", msrp_msg->payload);
+               }
+
+               msg = switch_msrp_msg_dup(msrp_msg);
+
+               status = switch_queue_trypush(ms->send_queue, msg);
+
+               if (status != SWITCH_STATUS_SUCCESS) {
+                       switch_log_printf(SWITCH_CHANNEL_ID_LOG, file, func, line, ms->call_id, SWITCH_LOG_ERROR, "MSRP queue FULL! Discard one message %" SWITCH_SIZE_T_FMT " bytes\n", msg->payload_bytes);
+                       switch_msrp_msg_destroy(&msg);
+               }
+
+               return status;
+       }
+
+       if (ms->send_queue) {
+               while (status == SWITCH_STATUS_SUCCESS && switch_queue_trypop(ms->send_queue, (void **)&msg) == SWITCH_STATUS_SUCCESS) {
+                       status = switch_msrp_do_send(ms, msg, file, func, line);
+               }
+
+               switch_queue_term(ms->send_queue);
+               ms->send_queue = NULL;
+       }
+
+       status = switch_msrp_do_send(ms, msrp_msg, file, func, line);
+
+       return status;
+}
 
 SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_create()
 {
@@ -1553,8 +1599,10 @@ SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_dup(switch_msrp_msg_t *msg)
        new_msg->code_number = msg->code_number;
        new_msg->payload_bytes = msg->payload_bytes;
 
-       if (msg->payload) {
+       if (msg->payload_bytes > 0 && msg->payload) {
+               new_msg->payload = malloc(msg->payload_bytes + 1);
                memcpy(new_msg->payload, msg->payload, msg->payload_bytes);
+               *(new_msg->payload + msg->payload_bytes) = '\0';
        }
 
        return new_msg;
@@ -1562,7 +1610,6 @@ SWITCH_DECLARE(switch_msrp_msg_t *) switch_msrp_msg_dup(switch_msrp_msg_t *msg)
 
 SWITCH_DECLARE(void) switch_msrp_msg_destroy(switch_msrp_msg_t **msg)
 {
-
        switch_msrp_msg_t *msrp_msg = *msg;
        if (msrp_msg->headers) {
                switch_event_destroy(&msrp_msg->headers);