From 8ad0243e1f382e48beb44df5f671bc03b9ae3e54 Mon Sep 17 00:00:00 2001 From: Christian Schmitz Date: Sat, 5 Apr 2025 13:28:03 +0200 Subject: [PATCH] mqtt: send ping at upkeep interval Closes #16975 --- docs/internals/MQTT.md | 4 +++ docs/libcurl/curl_easy_upkeep.md | 5 ++- lib/mqtt.c | 56 ++++++++++++++++++++++++++++++++ lib/mqtt.h | 2 ++ 4 files changed, 66 insertions(+), 1 deletion(-) diff --git a/docs/internals/MQTT.md b/docs/internals/MQTT.md index 291fd8ea2c..9b8d0efaae 100644 --- a/docs/internals/MQTT.md +++ b/docs/internals/MQTT.md @@ -26,6 +26,10 @@ Example subscribe: This sends an MQTT SUBSCRIBE packet for the topic `bedroom/temp` and listen in for incoming PUBLISH packets. +You can set the upkeep interval ms option to make curl send MQTT ping requests to the +server at an internal, to prevent the connection to get closed because of idleness. +You might then need to use the progress callback to cancel the operation. + ### Publishing Command usage: diff --git a/docs/libcurl/curl_easy_upkeep.md b/docs/libcurl/curl_easy_upkeep.md index bd3e819e0e..1aa448bee3 100644 --- a/docs/libcurl/curl_easy_upkeep.md +++ b/docs/libcurl/curl_easy_upkeep.md @@ -31,10 +31,13 @@ send some traffic on existing connections in order to keep them alive; this can prevent connections from being closed due to overzealous firewalls, for example. -Currently the only protocol with a connection upkeep mechanism is HTTP/2: when +For HTTP/2 we have an upkeep mechanism: when the connection upkeep interval is exceeded and curl_easy_upkeep(3) is called, an HTTP/2 PING frame is sent on the connection. +For MQTT the upkeep interval defines when to send ping requests to prevent the +server from disconnecting. + This function must be explicitly called in order to perform the upkeep work. The connection upkeep interval is set with CURLOPT_UPKEEP_INTERVAL_MS(3). diff --git a/lib/mqtt.c b/lib/mqtt.c index c9ba51c448..b22b086ca3 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -46,12 +46,16 @@ /* The last #include file should be: */ #include "memdebug.h" +/* first byte is command. + second byte is for flags. */ #define MQTT_MSG_CONNECT 0x10 /* #define MQTT_MSG_CONNACK 0x20 */ #define MQTT_MSG_PUBLISH 0x30 #define MQTT_MSG_SUBSCRIBE 0x82 #define MQTT_MSG_SUBACK 0x90 #define MQTT_MSG_DISCONNECT 0xe0 +#define MQTT_MSG_PINGREQ 0xC0 +#define MQTT_MSG_PINGRESP 0xD0 #define MQTT_CONNACK_LEN 2 #define MQTT_SUBACK_LEN 3 @@ -125,6 +129,7 @@ static CURLcode mqtt_send(struct Curl_easy *data, CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n); if(result) return result; + mq->lastTime = Curl_now(); Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); if(len != n) { size_t nsend = len - n; @@ -687,6 +692,9 @@ MQTT_SUBACK_COMING: goto end; } + /* we received something */ + mq->lastTime = Curl_now(); + /* if QoS is set, message contains packet id */ result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread); if(result) @@ -709,9 +717,13 @@ end: static CURLcode mqtt_do(struct Curl_easy *data, bool *done) { + struct MQTT *mq = data->req.p.mqtt; CURLcode result = CURLE_OK; *done = FALSE; /* unconditionally */ + mq->lastTime = Curl_now(); + mq->pingsent = FALSE; + result = mqtt_connect(data); if(result) { failf(data, "Error %d sending MQTT CONNECT request", result); @@ -732,6 +744,35 @@ static CURLcode mqtt_done(struct Curl_easy *data, return CURLE_OK; } +/* we ping regularly to avoid being disconnected by the server */ +static CURLcode mqtt_ping(struct Curl_easy *data) +{ + CURLcode result = CURLE_OK; + struct connectdata *conn = data->conn; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct MQTT *mq = data->req.p.mqtt; + + if(mqtt->state == MQTT_FIRST && + !mq->pingsent && + data->set.upkeep_interval_ms > 0) { + struct curltime t = Curl_now(); + timediff_t diff = Curl_timediff(t, mq->lastTime); + + if(diff > data->set.upkeep_interval_ms) { + /* 0xC0 is PINGREQ, and 0x00 is remaining length */ + unsigned char packet[2] = { 0xC0, 0x00 }; + size_t packetlen = sizeof(packet); + + result = mqtt_send(data, (char *)packet, packetlen); + if(!result) { + mq->pingsent = TRUE; + } + infof(data, "mqtt_ping: sent ping request."); + } + } + return result; +} + static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) { CURLcode result = CURLE_OK; @@ -750,6 +791,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) return result; } + result = mqtt_ping(data); + if(result) + return result; + infof(data, "mqtt_doing: state [%d]", (int) mqtt->state); switch(mqtt->state) { case MQTT_FIRST: @@ -764,6 +809,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) break; } Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1); + + /* we received something */ + mq->lastTime = Curl_now(); + /* remember the first byte */ mq->npacket = 0; mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); @@ -794,6 +843,13 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) infof(data, "Got DISCONNECT"); *done = TRUE; } + + /* ping response */ + if(mq->firstbyte == MQTT_MSG_PINGRESP) { + infof(data, "Received ping response."); + mq->pingsent = FALSE; + mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); + } break; case MQTT_CONNACK: result = mqtt_verify_connack(data); diff --git a/lib/mqtt.h b/lib/mqtt.h index b181e3c16d..610aa18929 100644 --- a/lib/mqtt.h +++ b/lib/mqtt.h @@ -55,6 +55,8 @@ struct MQTT { size_t npacket; /* byte counter */ size_t remaining_length; unsigned char pkt_hd[4]; /* for decoding the arriving packet length */ + struct curltime lastTime; /* last time we sent or received data */ + bool pingsent; /* 1 while we wait for ping response */ unsigned char firstbyte; }; -- 2.47.2