]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
MQTT: improve receive of ACKs
authorStefan Eissing <stefan@eissing.org>
Mon, 9 Oct 2023 09:36:37 +0000 (11:36 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Mon, 9 Oct 2023 16:34:17 +0000 (18:34 +0200)
- add `mq->recvbuf` to provide buffering of incomplete
  ACK responses
- continue ACK reading until sufficient bytes available
- fixes test failures on low network receives

Closes #12071

lib/dynbuf.h
lib/mqtt.c
lib/mqtt.h

index 6291eabd37de9ac4652e2df5d2f84a4220a99660..31a9130197913c4f4c1aa6410b26d27adc43b8ae 100644 (file)
@@ -89,4 +89,5 @@ int Curl_dyn_vprintf(struct dynbuf *dyn, const char *format, va_list ap_save);
 #define DYN_H1_TRAILER      4096
 #define DYN_PINGPPONG_CMD   (64*1024)
 #define DYN_IMAP_CMD        (64*1024)
+#define DYN_MQTT_RECV       (64*1024)
 #endif
index 5cb2d24110973946e6b861fac7ae3346f74eae81..54f88822c0df225ab115aa8612396ce0947ac14e 100644 (file)
@@ -109,6 +109,7 @@ static CURLcode mqtt_setup_conn(struct Curl_easy *data,
   mq = calloc(1, sizeof(struct MQTT));
   if(!mq)
     return CURLE_OUT_OF_MEMORY;
+  Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
   data->req.p.mqtt = mq;
   return CURLE_OK;
 }
@@ -350,36 +351,66 @@ static CURLcode mqtt_disconnect(struct Curl_easy *data)
   struct MQTT *mq = data->req.p.mqtt;
   result = mqtt_send(data, (char *)"\xe0\x00", 2);
   Curl_safefree(mq->sendleftovers);
+  Curl_dyn_free(&mq->recvbuf);
   return result;
 }
 
-static CURLcode mqtt_verify_connack(struct Curl_easy *data)
+static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
 {
+  struct MQTT *mq = data->req.p.mqtt;
+  size_t rlen = Curl_dyn_len(&mq->recvbuf);
   CURLcode result;
-  struct connectdata *conn = data->conn;
-  curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
-  unsigned char readbuf[MQTT_CONNACK_LEN];
-  ssize_t nread;
 
-  result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
-  if(result)
-    goto fail;
+  if(rlen < nbytes) {
+    unsigned char readbuf[1024];
+    ssize_t nread;
+
+    DEBUGASSERT(nbytes - rlen < sizeof(readbuf));
+    result = Curl_read(data, data->conn->sock[FIRSTSOCKET],
+                       (char *)readbuf, nbytes - rlen, &nread);
+    if(result)
+      return result;
+    DEBUGASSERT(nread >= 0);
+    if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread))
+      return CURLE_OUT_OF_MEMORY;
+    rlen = Curl_dyn_len(&mq->recvbuf);
+  }
+  return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN;
+}
 
-  Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
+static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
+{
+  struct MQTT *mq = data->req.p.mqtt;
+  size_t rlen = Curl_dyn_len(&mq->recvbuf);
+  if(rlen <= nbytes)
+    Curl_dyn_reset(&mq->recvbuf);
+  else
+    Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+}
 
-  /* fixme */
-  if(nread < MQTT_CONNACK_LEN) {
-    result = CURLE_WEIRD_SERVER_REPLY;
+static CURLcode mqtt_verify_connack(struct Curl_easy *data)
+{
+  struct MQTT *mq = data->req.p.mqtt;
+  CURLcode result;
+  char *ptr;
+
+  result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
+  if(result)
     goto fail;
-  }
 
   /* verify CONNACK */
-  if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
+  DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
+  ptr = Curl_dyn_ptr(&mq->recvbuf);
+  Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
+
+  if(ptr[0] != 0x00 || ptr[1] != 0x00) {
     failf(data, "Expected %02x%02x but got %02x%02x",
-          0x00, 0x00, readbuf[0], readbuf[1]);
+          0x00, 0x00, ptr[0], ptr[1]);
+    Curl_dyn_reset(&mq->recvbuf);
     result = CURLE_WEIRD_SERVER_REPLY;
+    goto fail;
   }
-
+  mqtt_recv_consume(data, MQTT_CONNACK_LEN);
 fail:
   return result;
 }
@@ -452,31 +483,29 @@ fail:
  */
 static CURLcode mqtt_verify_suback(struct Curl_easy *data)
 {
-  CURLcode result;
+  struct MQTT *mq = data->req.p.mqtt;
   struct connectdata *conn = data->conn;
-  curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
-  unsigned char readbuf[MQTT_SUBACK_LEN];
-  ssize_t nread;
   struct mqtt_conn *mqtt = &conn->proto.mqtt;
+  CURLcode result;
+  char *ptr;
 
-  result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
+  result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
   if(result)
     goto fail;
 
-  Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
-
-  /* fixme */
-  if(nread < MQTT_SUBACK_LEN) {
+  /* verify SUBACK */
+  DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
+  ptr = Curl_dyn_ptr(&mq->recvbuf);
+  Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
+
+  if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
+     ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
+     ptr[2] != 0x00) {
+    Curl_dyn_reset(&mq->recvbuf);
     result = CURLE_WEIRD_SERVER_REPLY;
     goto fail;
   }
-
-  /* verify SUBACK */
-  if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
-     readbuf[1] != (mqtt->packetid & 0xff) ||
-     readbuf[2] != 0x00)
-    result = CURLE_WEIRD_SERVER_REPLY;
-
+  mqtt_recv_consume(data, MQTT_SUBACK_LEN);
 fail:
   return result;
 }
@@ -713,6 +742,7 @@ static CURLcode mqtt_done(struct Curl_easy *data,
   (void)status;
   (void)premature;
   Curl_safefree(mq->sendleftovers);
+  Curl_dyn_free(&mq->recvbuf);
   return CURLE_OK;
 }
 
index 63961366fcb09ed40d400fb8632c9632d32dbeae..84f177022ebd3e538fa5bed1c0b445dc847cd797 100644 (file)
@@ -56,6 +56,7 @@ struct MQTT {
   size_t npacket; /* byte counter */
   unsigned char firstbyte;
   size_t remaining_length;
+  struct dynbuf recvbuf;
 };
 
 #endif /* HEADER_CURL_MQTT_H */