]> git.ipfire.org Git - thirdparty/curl.git/commitdiff
mqtt: use conn/easy meta hash
authorStefan Eissing <stefan@eissing.org>
Tue, 29 Apr 2025 08:49:46 +0000 (10:49 +0200)
committerDaniel Stenberg <daniel@haxx.se>
Tue, 29 Apr 2025 12:25:25 +0000 (14:25 +0200)
Remove mqtt structs from the unions at connectdata and
easy handle requests. Use meta hash at easy/connnection.

Make mqtt structs private to mqtt.c

Closes #17221

lib/mqtt.c
lib/mqtt.h
lib/request.h
lib/urldata.h

index b22b086ca3c59c366925441ac6441a4f36b16507..0d736de2907362de85b270edacc036d867d52b74 100644 (file)
 #define MQTT_SUBACK_LEN 3
 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
 
+/* meta key for storing protocol meta at easy handle */
+#define CURL_META_MQTT_EASY   "meta:proto:mqtt:easy"
+/* meta key for storing protocol meta at connection */
+#define CURL_META_MQTT_CONN   "meta:proto:mqtt:conn"
+
+enum mqttstate {
+  MQTT_FIRST,             /* 0 */
+  MQTT_REMAINING_LENGTH,  /* 1 */
+  MQTT_CONNACK,           /* 2 */
+  MQTT_SUBACK,            /* 3 */
+  MQTT_SUBACK_COMING,     /* 4 - the SUBACK remainder */
+  MQTT_PUBWAIT,    /* 5 - wait for publish */
+  MQTT_PUB_REMAIN,  /* 6 - wait for the remainder of the publish */
+
+  MQTT_NOSTATE /* 7 - never used an actual state */
+};
+
+struct mqtt_conn {
+  enum mqttstate state;
+  enum mqttstate nextstate; /* switch to this after remaining length is
+                               done */
+  unsigned int packetid;
+};
+
+/* protocol-specific transfer-related data */
+struct MQTT {
+  struct dynbuf sendbuf;
+  /* when receiving */
+  struct dynbuf recvbuf;
+  size_t npacket; /* byte counter */
+  size_t remaining_length;
+  unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
+  struct curltime lastTime; /* last time we sent or received data */
+  unsigned char firstbyte;
+  BIT(pingsent); /* 1 while we wait for ping response */
+};
+
+
 /*
  * Forward declarations.
  */
@@ -103,30 +141,56 @@ const struct Curl_handler Curl_handler_mqtt = {
   PROTOPT_NONE                        /* flags */
 };
 
+static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
+{
+  struct MQTT *mq = entry;
+  (void)key;
+  (void)klen;
+  Curl_dyn_free(&mq->sendbuf);
+  Curl_dyn_free(&mq->recvbuf);
+  free(mq);
+}
+
+static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
+{
+  (void)key;
+  (void)klen;
+  free(entry);
+}
+
 static CURLcode mqtt_setup_conn(struct Curl_easy *data,
                                 struct connectdata *conn)
 {
-  /* allocate the HTTP-specific struct for the Curl_easy, only to survive
-     during this request */
+  /* setup MQTT specific meta data at easy handle and connection */
+  struct mqtt_conn *mqtt;
   struct MQTT *mq;
-  (void)conn;
-  DEBUGASSERT(data->req.p.mqtt == NULL);
+
+  mqtt = calloc(1, sizeof(*mqtt));
+  if(!mqtt ||
+     Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
+    return CURLE_OUT_OF_MEMORY;
 
   mq = calloc(1, sizeof(struct MQTT));
   if(!mq)
     return CURLE_OUT_OF_MEMORY;
   Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
   Curl_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
-  data->req.p.mqtt = mq;
+  if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
+    return CURLE_OUT_OF_MEMORY;
   return CURLE_OK;
 }
 
 static CURLcode mqtt_send(struct Curl_easy *data,
                           const char *buf, size_t len)
 {
-  struct MQTT *mq = data->req.p.mqtt;
   size_t n;
-  CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
+  CURLcode result;
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+
+  if(!mq)
+    return CURLE_FAILED_INIT;
+
+  result = Curl_xfer_send(data, buf, len, FALSE, &n);
   if(result)
     return result;
   mq->lastTime = Curl_now();
@@ -349,20 +413,19 @@ end:
 
 static CURLcode mqtt_disconnect(struct Curl_easy *data)
 {
-  CURLcode result = CURLE_OK;
-  struct MQTT *mq = data->req.p.mqtt;
-  result = mqtt_send(data, "\xe0\x00", 2);
-  Curl_dyn_free(&mq->sendbuf);
-  Curl_dyn_free(&mq->recvbuf);
-  return result;
+  return mqtt_send(data, "\xe0\x00", 2);
 }
 
 static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
 {
-  struct MQTT *mq = data->req.p.mqtt;
-  size_t rlen = Curl_dyn_len(&mq->recvbuf);
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+  size_t rlen;
   CURLcode result;
 
+  if(!mq)
+    return CURLE_FAILED_INIT;
+  rlen = Curl_dyn_len(&mq->recvbuf);
+
   if(rlen < nbytes) {
     unsigned char readbuf[1024];
     ssize_t nread;
@@ -381,20 +444,27 @@ static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
 
 static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
 {
-  struct MQTT *mq = data->req.p.mqtt;
-  size_t rlen = Curl_dyn_len(&mq->recvbuf);
-  if(rlen <= nbytes)
-    Curl_dyn_reset(&mq->recvbuf);
-  else
-    Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+  DEBUGASSERT(mq);
+  if(mq) {
+    size_t rlen = Curl_dyn_len(&mq->recvbuf);
+    if(rlen <= nbytes)
+      Curl_dyn_reset(&mq->recvbuf);
+    else
+      Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+  }
 }
 
 static CURLcode mqtt_verify_connack(struct Curl_easy *data)
 {
-  struct MQTT *mq = data->req.p.mqtt;
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   CURLcode result;
   char *ptr;
 
+  DEBUGASSERT(mq);
+  if(!mq)
+    return CURLE_FAILED_INIT;
+
   result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
   if(result)
     goto fail;
@@ -444,12 +514,16 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data)
   char encodedsize[4];
   size_t n;
   struct connectdata *conn = data->conn;
+  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+
+  if(!mqtt)
+    return CURLE_FAILED_INIT;
 
   result = mqtt_get_topic(data, &topic, &topiclen);
   if(result)
     goto fail;
 
-  conn->proto.mqtt.packetid++;
+  mqtt->packetid++;
 
   packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
                                + 2 bytes topic length + QoS byte */
@@ -464,8 +538,8 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data)
 
   packet[0] = MQTT_MSG_SUBSCRIBE;
   memcpy(&packet[1], encodedsize, n);
-  packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
-  packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
+  packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
+  packet[2 + n] = mqtt->packetid & 0xff;
   packet[3 + n] = (topiclen >> 8) & 0xff;
   packet[4 + n ] = topiclen & 0xff;
   memcpy(&packet[5 + n], topic, topiclen);
@@ -484,12 +558,15 @@ fail:
  */
 static CURLcode mqtt_verify_suback(struct Curl_easy *data)
 {
-  struct MQTT *mq = data->req.p.mqtt;
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   struct connectdata *conn = data->conn;
-  struct mqtt_conn *mqtt = &conn->proto.mqtt;
+  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
   CURLcode result;
   char *ptr;
 
+  if(!mqtt || !mq)
+    return CURLE_FAILED_INIT;
+
   result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
   if(result)
     goto fail;
@@ -606,7 +683,10 @@ static void mqstate(struct Curl_easy *data,
                     enum mqttstate nextstate) /* used if state == FIRST */
 {
   struct connectdata *conn = data->conn;
-  struct mqtt_conn *mqtt = &conn->proto.mqtt;
+  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+  DEBUGASSERT(mqtt);
+  if(!mqtt)
+    return;
 #ifdef DEBUGBUILD
   infof(data, "%s (from %s) (next is %s)",
         statenames[state],
@@ -625,10 +705,14 @@ static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
   struct connectdata *conn = data->conn;
   ssize_t nread;
   size_t remlen;
-  struct mqtt_conn *mqtt = &conn->proto.mqtt;
-  struct MQTT *mq = data->req.p.mqtt;
+  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   unsigned char packet;
 
+  DEBUGASSERT(mqtt);
+  if(!mqtt || !mq)
+    return CURLE_FAILED_INIT;
+
   switch(mqtt->state) {
 MQTT_SUBACK_COMING:
   case MQTT_SUBACK_COMING:
@@ -717,10 +801,12 @@ end:
 
 static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
 {
-  struct MQTT *mq = data->req.p.mqtt;
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   CURLcode result = CURLE_OK;
   *done = FALSE; /* unconditionally */
 
+  if(!mq)
+    return CURLE_FAILED_INIT;
   mq->lastTime = Curl_now();
   mq->pingsent = FALSE;
 
@@ -736,21 +822,26 @@ static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
 static CURLcode mqtt_done(struct Curl_easy *data,
                           CURLcode status, bool premature)
 {
-  struct MQTT *mq = data->req.p.mqtt;
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   (void)status;
   (void)premature;
-  Curl_dyn_free(&mq->sendbuf);
-  Curl_dyn_free(&mq->recvbuf);
+  if(mq) {
+    Curl_dyn_free(&mq->sendbuf);
+    Curl_dyn_free(&mq->recvbuf);
+  }
   return CURLE_OK;
 }
 
 /* we ping regularly to avoid being disconnected by the server */
 static CURLcode mqtt_ping(struct Curl_easy *data)
 {
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   CURLcode result = CURLE_OK;
   struct connectdata *conn = data->conn;
-  struct mqtt_conn *mqtt = &conn->proto.mqtt;
-  struct MQTT *mq = data->req.p.mqtt;
+  struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+
+  if(!mqtt || !mq)
+    return CURLE_FAILED_INIT;
 
   if(mqtt->state == MQTT_FIRST &&
      !mq->pingsent &&
@@ -775,11 +866,14 @@ static CURLcode mqtt_ping(struct Curl_easy *data)
 
 static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
 {
+  struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
   CURLcode result = CURLE_OK;
-  struct mqtt_conn *mqtt = &data->conn->proto.mqtt;
-  struct MQTT *mq = data->req.p.mqtt;
   ssize_t nread;
   unsigned char recvbyte;
+  struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
+
+  if(!mqtt || !mq)
+    return CURLE_FAILED_INIT;
 
   *done = FALSE;
 
index c824edd373ae733aebf162108046e908288dd381..8fb8a33c022a870def1fcbad63f70b8f1e6e0a31 100644 (file)
 extern const struct Curl_handler Curl_handler_mqtt;
 #endif
 
-enum mqttstate {
-  MQTT_FIRST,             /* 0 */
-  MQTT_REMAINING_LENGTH,  /* 1 */
-  MQTT_CONNACK,           /* 2 */
-  MQTT_SUBACK,            /* 3 */
-  MQTT_SUBACK_COMING,     /* 4 - the SUBACK remainder */
-  MQTT_PUBWAIT,    /* 5 - wait for publish */
-  MQTT_PUB_REMAIN,  /* 6 - wait for the remainder of the publish */
-
-  MQTT_NOSTATE /* 7 - never used an actual state */
-};
-
-struct mqtt_conn {
-  enum mqttstate state;
-  enum mqttstate nextstate; /* switch to this after remaining length is
-                               done */
-  unsigned int packetid;
-};
-
-/* protocol-specific transfer-related data */
-struct MQTT {
-  struct dynbuf sendbuf;
-  /* when receiving */
-  struct dynbuf recvbuf;
-  size_t npacket; /* byte counter */
-  size_t remaining_length;
-  unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
-  struct curltime lastTime; /* last time we sent or received data */
-  unsigned char firstbyte;
-  BIT(pingsent); /* 1 while we wait for ping response */
-};
-
 #endif /* HEADER_CURL_MQTT_H */
index 0020fa9b563bb35f7daf7fb46a7564970049e178..c6380a24d312dc4ae0d5267874b37430edf9b7b9 100644 (file)
@@ -106,7 +106,6 @@ struct SingleRequest {
     struct FTP *ftp;
     struct IMAP *imap;
     struct ldapreqinfo *ldap;
-    struct MQTT *mqtt;
     struct POP3 *pop3;
     struct RTSP *rtsp;
     struct smb_request *smb;
index e4f807597bab1384836dce17653454bb4f693524..e3224ebd16743a7297d5435f925fd25032f2a38e 100644 (file)
@@ -894,9 +894,6 @@ struct connectdata {
 #endif
 #ifdef USE_OPENLDAP
     struct ldapconninfo *ldapc;
-#endif
-#ifndef CURL_DISABLE_MQTT
-    struct mqtt_conn mqtt;
 #endif
     unsigned int unused:1; /* avoids empty union */
   } proto;