From: Stefan Eissing Date: Tue, 29 Apr 2025 08:49:46 +0000 (+0200) Subject: mqtt: use conn/easy meta hash X-Git-Tag: curl-8_14_0~175 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=47b2300192564007cc068a692920bf8611e6b3a6;p=thirdparty%2Fcurl.git mqtt: use conn/easy meta hash Remove mqtt structs from the unions at connectdata and easy handle requests. Use meta hash at easy/connnection. Make mqtt structs private to mqtt.c Closes #17221 --- diff --git a/lib/mqtt.c b/lib/mqtt.c index b22b086ca3..0d736de290 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -61,6 +61,44 @@ #define MQTT_SUBACK_LEN 3 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ +/* meta key for storing protocol meta at easy handle */ +#define CURL_META_MQTT_EASY "meta:proto:mqtt:easy" +/* meta key for storing protocol meta at connection */ +#define CURL_META_MQTT_CONN "meta:proto:mqtt:conn" + +enum mqttstate { + MQTT_FIRST, /* 0 */ + MQTT_REMAINING_LENGTH, /* 1 */ + MQTT_CONNACK, /* 2 */ + MQTT_SUBACK, /* 3 */ + MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */ + MQTT_PUBWAIT, /* 5 - wait for publish */ + MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */ + + MQTT_NOSTATE /* 7 - never used an actual state */ +}; + +struct mqtt_conn { + enum mqttstate state; + enum mqttstate nextstate; /* switch to this after remaining length is + done */ + unsigned int packetid; +}; + +/* protocol-specific transfer-related data */ +struct MQTT { + struct dynbuf sendbuf; + /* when receiving */ + struct dynbuf recvbuf; + size_t npacket; /* byte counter */ + size_t remaining_length; + unsigned char pkt_hd[4]; /* for decoding the arriving packet length */ + struct curltime lastTime; /* last time we sent or received data */ + unsigned char firstbyte; + BIT(pingsent); /* 1 while we wait for ping response */ +}; + + /* * Forward declarations. */ @@ -103,30 +141,56 @@ const struct Curl_handler Curl_handler_mqtt = { PROTOPT_NONE /* flags */ }; +static void mqtt_easy_dtor(void *key, size_t klen, void *entry) +{ + struct MQTT *mq = entry; + (void)key; + (void)klen; + Curl_dyn_free(&mq->sendbuf); + Curl_dyn_free(&mq->recvbuf); + free(mq); +} + +static void mqtt_conn_dtor(void *key, size_t klen, void *entry) +{ + (void)key; + (void)klen; + free(entry); +} + static CURLcode mqtt_setup_conn(struct Curl_easy *data, struct connectdata *conn) { - /* allocate the HTTP-specific struct for the Curl_easy, only to survive - during this request */ + /* setup MQTT specific meta data at easy handle and connection */ + struct mqtt_conn *mqtt; struct MQTT *mq; - (void)conn; - DEBUGASSERT(data->req.p.mqtt == NULL); + + mqtt = calloc(1, sizeof(*mqtt)); + if(!mqtt || + Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor)) + return CURLE_OUT_OF_MEMORY; mq = calloc(1, sizeof(struct MQTT)); if(!mq) return CURLE_OUT_OF_MEMORY; Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); Curl_dyn_init(&mq->sendbuf, DYN_MQTT_SEND); - data->req.p.mqtt = mq; + if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor)) + return CURLE_OUT_OF_MEMORY; return CURLE_OK; } static CURLcode mqtt_send(struct Curl_easy *data, const char *buf, size_t len) { - struct MQTT *mq = data->req.p.mqtt; size_t n; - CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n); + CURLcode result; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + + if(!mq) + return CURLE_FAILED_INIT; + + result = Curl_xfer_send(data, buf, len, FALSE, &n); if(result) return result; mq->lastTime = Curl_now(); @@ -349,20 +413,19 @@ end: static CURLcode mqtt_disconnect(struct Curl_easy *data) { - CURLcode result = CURLE_OK; - struct MQTT *mq = data->req.p.mqtt; - result = mqtt_send(data, "\xe0\x00", 2); - Curl_dyn_free(&mq->sendbuf); - Curl_dyn_free(&mq->recvbuf); - return result; + return mqtt_send(data, "\xe0\x00", 2); } static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) { - struct MQTT *mq = data->req.p.mqtt; - size_t rlen = Curl_dyn_len(&mq->recvbuf); + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + size_t rlen; CURLcode result; + if(!mq) + return CURLE_FAILED_INIT; + rlen = Curl_dyn_len(&mq->recvbuf); + if(rlen < nbytes) { unsigned char readbuf[1024]; ssize_t nread; @@ -381,20 +444,27 @@ static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) { - struct MQTT *mq = data->req.p.mqtt; - size_t rlen = Curl_dyn_len(&mq->recvbuf); - if(rlen <= nbytes) - Curl_dyn_reset(&mq->recvbuf); - else - Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); + DEBUGASSERT(mq); + if(mq) { + size_t rlen = Curl_dyn_len(&mq->recvbuf); + if(rlen <= nbytes) + Curl_dyn_reset(&mq->recvbuf); + else + Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); + } } static CURLcode mqtt_verify_connack(struct Curl_easy *data) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result; char *ptr; + DEBUGASSERT(mq); + if(!mq) + return CURLE_FAILED_INIT; + result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); if(result) goto fail; @@ -444,12 +514,16 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data) char encodedsize[4]; size_t n; struct connectdata *conn = data->conn; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + + if(!mqtt) + return CURLE_FAILED_INIT; result = mqtt_get_topic(data, &topic, &topiclen); if(result) goto fail; - conn->proto.mqtt.packetid++; + mqtt->packetid++; packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) + 2 bytes topic length + QoS byte */ @@ -464,8 +538,8 @@ static CURLcode mqtt_subscribe(struct Curl_easy *data) packet[0] = MQTT_MSG_SUBSCRIBE; memcpy(&packet[1], encodedsize, n); - packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff; - packet[2 + n] = conn->proto.mqtt.packetid & 0xff; + packet[1 + n] = (mqtt->packetid >> 8) & 0xff; + packet[2 + n] = mqtt->packetid & 0xff; packet[3 + n] = (topiclen >> 8) & 0xff; packet[4 + n ] = topiclen & 0xff; memcpy(&packet[5 + n], topic, topiclen); @@ -484,12 +558,15 @@ fail: */ static CURLcode mqtt_verify_suback(struct Curl_easy *data) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); struct connectdata *conn = data->conn; - struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); CURLcode result; char *ptr; + if(!mqtt || !mq) + return CURLE_FAILED_INIT; + result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); if(result) goto fail; @@ -606,7 +683,10 @@ static void mqstate(struct Curl_easy *data, enum mqttstate nextstate) /* used if state == FIRST */ { struct connectdata *conn = data->conn; - struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + DEBUGASSERT(mqtt); + if(!mqtt) + return; #ifdef DEBUGBUILD infof(data, "%s (from %s) (next is %s)", statenames[state], @@ -625,10 +705,14 @@ static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) struct connectdata *conn = data->conn; ssize_t nread; size_t remlen; - struct mqtt_conn *mqtt = &conn->proto.mqtt; - struct MQTT *mq = data->req.p.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); unsigned char packet; + DEBUGASSERT(mqtt); + if(!mqtt || !mq) + return CURLE_FAILED_INIT; + switch(mqtt->state) { MQTT_SUBACK_COMING: case MQTT_SUBACK_COMING: @@ -717,10 +801,12 @@ end: static CURLcode mqtt_do(struct Curl_easy *data, bool *done) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result = CURLE_OK; *done = FALSE; /* unconditionally */ + if(!mq) + return CURLE_FAILED_INIT; mq->lastTime = Curl_now(); mq->pingsent = FALSE; @@ -736,21 +822,26 @@ static CURLcode mqtt_do(struct Curl_easy *data, bool *done) static CURLcode mqtt_done(struct Curl_easy *data, CURLcode status, bool premature) { - struct MQTT *mq = data->req.p.mqtt; + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); (void)status; (void)premature; - Curl_dyn_free(&mq->sendbuf); - Curl_dyn_free(&mq->recvbuf); + if(mq) { + Curl_dyn_free(&mq->sendbuf); + Curl_dyn_free(&mq->recvbuf); + } return CURLE_OK; } /* we ping regularly to avoid being disconnected by the server */ static CURLcode mqtt_ping(struct Curl_easy *data) { + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result = CURLE_OK; struct connectdata *conn = data->conn; - struct mqtt_conn *mqtt = &conn->proto.mqtt; - struct MQTT *mq = data->req.p.mqtt; + struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN); + + if(!mqtt || !mq) + return CURLE_FAILED_INIT; if(mqtt->state == MQTT_FIRST && !mq->pingsent && @@ -775,11 +866,14 @@ static CURLcode mqtt_ping(struct Curl_easy *data) static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) { + struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY); CURLcode result = CURLE_OK; - struct mqtt_conn *mqtt = &data->conn->proto.mqtt; - struct MQTT *mq = data->req.p.mqtt; ssize_t nread; unsigned char recvbyte; + struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN); + + if(!mqtt || !mq) + return CURLE_FAILED_INIT; *done = FALSE; diff --git a/lib/mqtt.h b/lib/mqtt.h index c824edd373..8fb8a33c02 100644 --- a/lib/mqtt.h +++ b/lib/mqtt.h @@ -28,36 +28,4 @@ extern const struct Curl_handler Curl_handler_mqtt; #endif -enum mqttstate { - MQTT_FIRST, /* 0 */ - MQTT_REMAINING_LENGTH, /* 1 */ - MQTT_CONNACK, /* 2 */ - MQTT_SUBACK, /* 3 */ - MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */ - MQTT_PUBWAIT, /* 5 - wait for publish */ - MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */ - - MQTT_NOSTATE /* 7 - never used an actual state */ -}; - -struct mqtt_conn { - enum mqttstate state; - enum mqttstate nextstate; /* switch to this after remaining length is - done */ - unsigned int packetid; -}; - -/* protocol-specific transfer-related data */ -struct MQTT { - struct dynbuf sendbuf; - /* when receiving */ - struct dynbuf recvbuf; - size_t npacket; /* byte counter */ - size_t remaining_length; - unsigned char pkt_hd[4]; /* for decoding the arriving packet length */ - struct curltime lastTime; /* last time we sent or received data */ - unsigned char firstbyte; - BIT(pingsent); /* 1 while we wait for ping response */ -}; - #endif /* HEADER_CURL_MQTT_H */ diff --git a/lib/request.h b/lib/request.h index 0020fa9b56..c6380a24d3 100644 --- a/lib/request.h +++ b/lib/request.h @@ -106,7 +106,6 @@ struct SingleRequest { struct FTP *ftp; struct IMAP *imap; struct ldapreqinfo *ldap; - struct MQTT *mqtt; struct POP3 *pop3; struct RTSP *rtsp; struct smb_request *smb; diff --git a/lib/urldata.h b/lib/urldata.h index e4f807597b..e3224ebd16 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -894,9 +894,6 @@ struct connectdata { #endif #ifdef USE_OPENLDAP struct ldapconninfo *ldapc; -#endif -#ifndef CURL_DISABLE_MQTT - struct mqtt_conn mqtt; #endif unsigned int unused:1; /* avoids empty union */ } proto;