]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
mqtt: send ping at upkeep interval
authorChristian Schmitz <support@monkeybreadsoftware.de>
Sat, 5 Apr 2025 11:28:03 +0000 (13:28 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Wed, 16 Apr 2025 07:36:19 +0000 (09:36 +0200)
Closes #16975

docs/internals/MQTT.md
docs/libcurl/curl_easy_upkeep.md
lib/mqtt.c
lib/mqtt.h

index 291fd8ea2c652d024078718c90fe5fd04fcb994d..9b8d0efaae2dde66a64c55e388f09199341084a3 100644 (file)
@@ -26,6 +26,10 @@ Example subscribe:
 This sends an MQTT SUBSCRIBE packet for the topic `bedroom/temp` and listen in
 for incoming PUBLISH packets.
 
+You can set the upkeep interval ms option to make curl send MQTT ping requests to the
+server at an internal, to prevent the connection to get closed because of idleness.
+You might then need to use the progress callback to cancel the operation.
+
 ### Publishing
 
 Command usage:
index bd3e819e0e68c9dce650fc5d6a23bf399e7f78de..1aa448bee344893b626ba99fadc885c26ac6ee08 100644 (file)
@@ -31,10 +31,13 @@ send some traffic on existing connections in order to keep them alive; this
 can prevent connections from being closed due to overzealous firewalls, for
 example.
 
-Currently the only protocol with a connection upkeep mechanism is HTTP/2: when
+For HTTP/2 we have an upkeep mechanism: when
 the connection upkeep interval is exceeded and curl_easy_upkeep(3)
 is called, an HTTP/2 PING frame is sent on the connection.
 
+For MQTT the upkeep interval defines when to send ping requests to prevent the
+server from disconnecting.
+
 This function must be explicitly called in order to perform the upkeep work.
 The connection upkeep interval is set with
 CURLOPT_UPKEEP_INTERVAL_MS(3).
index c9ba51c448dac55598d11e038629e88ca7f6cc80..b22b086ca3c59c366925441ac6441a4f36b16507 100644 (file)
 /* The last #include file should be: */
 #include "memdebug.h"
 
+/* first byte is command.
+   second byte is for flags. */
 #define MQTT_MSG_CONNECT    0x10
 /* #define MQTT_MSG_CONNACK    0x20 */
 #define MQTT_MSG_PUBLISH    0x30
 #define MQTT_MSG_SUBSCRIBE  0x82
 #define MQTT_MSG_SUBACK     0x90
 #define MQTT_MSG_DISCONNECT 0xe0
+#define MQTT_MSG_PINGREQ    0xC0
+#define MQTT_MSG_PINGRESP   0xD0
 
 #define MQTT_CONNACK_LEN 2
 #define MQTT_SUBACK_LEN 3
@@ -125,6 +129,7 @@ static CURLcode mqtt_send(struct Curl_easy *data,
   CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
   if(result)
     return result;
+  mq->lastTime = Curl_now();
   Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
   if(len != n) {
     size_t nsend = len - n;
@@ -687,6 +692,9 @@ MQTT_SUBACK_COMING:
       goto end;
     }
 
+    /* we received something */
+    mq->lastTime = Curl_now();
+
     /* if QoS is set, message contains packet id */
     result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
     if(result)
@@ -709,9 +717,13 @@ end:
 
 static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
 {
+  struct MQTT *mq = data->req.p.mqtt;
   CURLcode result = CURLE_OK;
   *done = FALSE; /* unconditionally */
 
+  mq->lastTime = Curl_now();
+  mq->pingsent = FALSE;
+
   result = mqtt_connect(data);
   if(result) {
     failf(data, "Error %d sending MQTT CONNECT request", result);
@@ -732,6 +744,35 @@ static CURLcode mqtt_done(struct Curl_easy *data,
   return CURLE_OK;
 }
 
+/* we ping regularly to avoid being disconnected by the server */
+static CURLcode mqtt_ping(struct Curl_easy *data)
+{
+  CURLcode result = CURLE_OK;
+  struct connectdata *conn = data->conn;
+  struct mqtt_conn *mqtt = &conn->proto.mqtt;
+  struct MQTT *mq = data->req.p.mqtt;
+
+  if(mqtt->state == MQTT_FIRST &&
+     !mq->pingsent &&
+     data->set.upkeep_interval_ms > 0) {
+    struct curltime t = Curl_now();
+    timediff_t diff = Curl_timediff(t, mq->lastTime);
+
+    if(diff > data->set.upkeep_interval_ms) {
+      /* 0xC0 is PINGREQ, and 0x00 is remaining length */
+      unsigned char packet[2] = { 0xC0, 0x00 };
+      size_t packetlen = sizeof(packet);
+
+      result = mqtt_send(data, (char *)packet, packetlen);
+      if(!result) {
+        mq->pingsent = TRUE;
+      }
+      infof(data, "mqtt_ping: sent ping request.");
+    }
+  }
+  return result;
+}
+
 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
 {
   CURLcode result = CURLE_OK;
@@ -750,6 +791,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
       return result;
   }
 
+  result = mqtt_ping(data);
+  if(result)
+    return result;
+
   infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
   switch(mqtt->state) {
   case MQTT_FIRST:
@@ -764,6 +809,10 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
       break;
     }
     Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
+
+    /* we received something */
+    mq->lastTime = Curl_now();
+
     /* remember the first byte */
     mq->npacket = 0;
     mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
@@ -794,6 +843,13 @@ static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
       infof(data, "Got DISCONNECT");
       *done = TRUE;
     }
+
+    /* ping response */
+    if(mq->firstbyte == MQTT_MSG_PINGRESP) {
+      infof(data, "Received ping response.");
+      mq->pingsent = FALSE;
+      mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
+    }
     break;
   case MQTT_CONNACK:
     result = mqtt_verify_connack(data);
index b181e3c16df592eeaf59d92953549152db152111..610aa18929f869f5a3d14ff5cbc1463fdc08452f 100644 (file)
@@ -55,6 +55,8 @@ struct MQTT {
   size_t npacket; /* byte counter */
   size_t remaining_length;
   unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
+  struct curltime lastTime; /* last time we sent or received data */
+  bool pingsent; /* 1 while we wait for ping response */
   unsigned char firstbyte;
 };