From: Stefan Eissing Date: Mon, 9 Oct 2023 09:36:37 +0000 (+0200) Subject: MQTT: improve receive of ACKs X-Git-Tag: curl-8_4_0~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b0f3d71c1fdb7aeb3872c1cf179cf69a457a4f07;p=thirdparty%2Fcurl.git MQTT: improve receive of ACKs - add `mq->recvbuf` to provide buffering of incomplete ACK responses - continue ACK reading until sufficient bytes available - fixes test failures on low network receives Closes #12071 --- diff --git a/lib/dynbuf.h b/lib/dynbuf.h index 6291eabd37..31a9130197 100644 --- a/lib/dynbuf.h +++ b/lib/dynbuf.h @@ -89,4 +89,5 @@ int Curl_dyn_vprintf(struct dynbuf *dyn, const char *format, va_list ap_save); #define DYN_H1_TRAILER 4096 #define DYN_PINGPPONG_CMD (64*1024) #define DYN_IMAP_CMD (64*1024) +#define DYN_MQTT_RECV (64*1024) #endif diff --git a/lib/mqtt.c b/lib/mqtt.c index 5cb2d24110..54f88822c0 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -109,6 +109,7 @@ static CURLcode mqtt_setup_conn(struct Curl_easy *data, mq = calloc(1, sizeof(struct MQTT)); if(!mq) return CURLE_OUT_OF_MEMORY; + Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV); data->req.p.mqtt = mq; return CURLE_OK; } @@ -350,36 +351,66 @@ static CURLcode mqtt_disconnect(struct Curl_easy *data) struct MQTT *mq = data->req.p.mqtt; result = mqtt_send(data, (char *)"\xe0\x00", 2); Curl_safefree(mq->sendleftovers); + Curl_dyn_free(&mq->recvbuf); return result; } -static CURLcode mqtt_verify_connack(struct Curl_easy *data) +static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes) { + struct MQTT *mq = data->req.p.mqtt; + size_t rlen = Curl_dyn_len(&mq->recvbuf); CURLcode result; - struct connectdata *conn = data->conn; - curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; - unsigned char readbuf[MQTT_CONNACK_LEN]; - ssize_t nread; - result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread); - if(result) - goto fail; + if(rlen < nbytes) { + unsigned char readbuf[1024]; + ssize_t nread; + + DEBUGASSERT(nbytes - rlen < sizeof(readbuf)); + result = Curl_read(data, data->conn->sock[FIRSTSOCKET], + (char *)readbuf, nbytes - rlen, &nread); + if(result) + return result; + DEBUGASSERT(nread >= 0); + if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread)) + return CURLE_OUT_OF_MEMORY; + rlen = Curl_dyn_len(&mq->recvbuf); + } + return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN; +} - Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); +static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes) +{ + struct MQTT *mq = data->req.p.mqtt; + size_t rlen = Curl_dyn_len(&mq->recvbuf); + if(rlen <= nbytes) + Curl_dyn_reset(&mq->recvbuf); + else + Curl_dyn_tail(&mq->recvbuf, rlen - nbytes); +} - /* fixme */ - if(nread < MQTT_CONNACK_LEN) { - result = CURLE_WEIRD_SERVER_REPLY; +static CURLcode mqtt_verify_connack(struct Curl_easy *data) +{ + struct MQTT *mq = data->req.p.mqtt; + CURLcode result; + char *ptr; + + result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN); + if(result) goto fail; - } /* verify CONNACK */ - if(readbuf[0] != 0x00 || readbuf[1] != 0x00) { + DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN); + ptr = Curl_dyn_ptr(&mq->recvbuf); + Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN); + + if(ptr[0] != 0x00 || ptr[1] != 0x00) { failf(data, "Expected %02x%02x but got %02x%02x", - 0x00, 0x00, readbuf[0], readbuf[1]); + 0x00, 0x00, ptr[0], ptr[1]); + Curl_dyn_reset(&mq->recvbuf); result = CURLE_WEIRD_SERVER_REPLY; + goto fail; } - + mqtt_recv_consume(data, MQTT_CONNACK_LEN); fail: return result; } @@ -452,31 +483,29 @@ fail: */ static CURLcode mqtt_verify_suback(struct Curl_easy *data) { - CURLcode result; + struct MQTT *mq = data->req.p.mqtt; struct connectdata *conn = data->conn; - curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; - unsigned char readbuf[MQTT_SUBACK_LEN]; - ssize_t nread; struct mqtt_conn *mqtt = &conn->proto.mqtt; + CURLcode result; + char *ptr; - result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread); + result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN); if(result) goto fail; - Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); - - /* fixme */ - if(nread < MQTT_SUBACK_LEN) { + /* verify SUBACK */ + DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN); + ptr = Curl_dyn_ptr(&mq->recvbuf); + Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN); + + if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) || + ((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) || + ptr[2] != 0x00) { + Curl_dyn_reset(&mq->recvbuf); result = CURLE_WEIRD_SERVER_REPLY; goto fail; } - - /* verify SUBACK */ - if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) || - readbuf[1] != (mqtt->packetid & 0xff) || - readbuf[2] != 0x00) - result = CURLE_WEIRD_SERVER_REPLY; - + mqtt_recv_consume(data, MQTT_SUBACK_LEN); fail: return result; } @@ -713,6 +742,7 @@ static CURLcode mqtt_done(struct Curl_easy *data, (void)status; (void)premature; Curl_safefree(mq->sendleftovers); + Curl_dyn_free(&mq->recvbuf); return CURLE_OK; } diff --git a/lib/mqtt.h b/lib/mqtt.h index 63961366fc..84f177022e 100644 --- a/lib/mqtt.h +++ b/lib/mqtt.h @@ -56,6 +56,7 @@ struct MQTT { size_t npacket; /* byte counter */ unsigned char firstbyte; size_t remaining_length; + struct dynbuf recvbuf; }; #endif /* HEADER_CURL_MQTT_H */