#define MQTT_SUBACK_LEN 3
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
+/* meta key for storing protocol meta at easy handle */
+#define CURL_META_MQTT_EASY "meta:proto:mqtt:easy"
+/* meta key for storing protocol meta at connection */
+#define CURL_META_MQTT_CONN "meta:proto:mqtt:conn"
+
+enum mqttstate {
+ MQTT_FIRST, /* 0 */
+ MQTT_REMAINING_LENGTH, /* 1 */
+ MQTT_CONNACK, /* 2 */
+ MQTT_SUBACK, /* 3 */
+ MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
+ MQTT_PUBWAIT, /* 5 - wait for publish */
+ MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
+
+ MQTT_NOSTATE /* 7 - never used an actual state */
+};
+
+struct mqtt_conn {
+ enum mqttstate state;
+ enum mqttstate nextstate; /* switch to this after remaining length is
+ done */
+ unsigned int packetid;
+};
+
+/* protocol-specific transfer-related data */
+struct MQTT {
+ struct dynbuf sendbuf;
+ /* when receiving */
+ struct dynbuf recvbuf;
+ size_t npacket; /* byte counter */
+ size_t remaining_length;
+ unsigned char pkt_hd[4]; /* for decoding the arriving packet length */
+ struct curltime lastTime; /* last time we sent or received data */
+ unsigned char firstbyte;
+ BIT(pingsent); /* 1 while we wait for ping response */
+};
+
+
/*
* Forward declarations.
*/
PROTOPT_NONE /* flags */
};
+static void mqtt_easy_dtor(void *key, size_t klen, void *entry)
+{
+ struct MQTT *mq = entry;
+ (void)key;
+ (void)klen;
+ Curl_dyn_free(&mq->sendbuf);
+ Curl_dyn_free(&mq->recvbuf);
+ free(mq);
+}
+
+static void mqtt_conn_dtor(void *key, size_t klen, void *entry)
+{
+ (void)key;
+ (void)klen;
+ free(entry);
+}
+
static CURLcode mqtt_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
{
- /* allocate the HTTP-specific struct for the Curl_easy, only to survive
- during this request */
+ /* setup MQTT specific meta data at easy handle and connection */
+ struct mqtt_conn *mqtt;
struct MQTT *mq;
- (void)conn;
- DEBUGASSERT(data->req.p.mqtt == NULL);
+
+ mqtt = calloc(1, sizeof(*mqtt));
+ if(!mqtt ||
+ Curl_conn_meta_set(conn, CURL_META_MQTT_CONN, mqtt, mqtt_conn_dtor))
+ return CURLE_OUT_OF_MEMORY;
mq = calloc(1, sizeof(struct MQTT));
if(!mq)
return CURLE_OUT_OF_MEMORY;
Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
Curl_dyn_init(&mq->sendbuf, DYN_MQTT_SEND);
- data->req.p.mqtt = mq;
+ if(Curl_meta_set(data, CURL_META_MQTT_EASY, mq, mqtt_easy_dtor))
+ return CURLE_OUT_OF_MEMORY;
return CURLE_OK;
}
static CURLcode mqtt_send(struct Curl_easy *data,
const char *buf, size_t len)
{
- struct MQTT *mq = data->req.p.mqtt;
size_t n;
- CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
+ CURLcode result;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+
+ if(!mq)
+ return CURLE_FAILED_INIT;
+
+ result = Curl_xfer_send(data, buf, len, FALSE, &n);
if(result)
return result;
mq->lastTime = Curl_now();
static CURLcode mqtt_disconnect(struct Curl_easy *data)
{
- CURLcode result = CURLE_OK;
- struct MQTT *mq = data->req.p.mqtt;
- result = mqtt_send(data, "\xe0\x00", 2);
- Curl_dyn_free(&mq->sendbuf);
- Curl_dyn_free(&mq->recvbuf);
- return result;
+ return mqtt_send(data, "\xe0\x00", 2);
}
static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
{
- struct MQTT *mq = data->req.p.mqtt;
- size_t rlen = Curl_dyn_len(&mq->recvbuf);
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+ size_t rlen;
CURLcode result;
+ if(!mq)
+ return CURLE_FAILED_INIT;
+ rlen = Curl_dyn_len(&mq->recvbuf);
+
if(rlen < nbytes) {
unsigned char readbuf[1024];
ssize_t nread;
static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
{
- struct MQTT *mq = data->req.p.mqtt;
- size_t rlen = Curl_dyn_len(&mq->recvbuf);
- if(rlen <= nbytes)
- Curl_dyn_reset(&mq->recvbuf);
- else
- Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
+ DEBUGASSERT(mq);
+ if(mq) {
+ size_t rlen = Curl_dyn_len(&mq->recvbuf);
+ if(rlen <= nbytes)
+ Curl_dyn_reset(&mq->recvbuf);
+ else
+ Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
+ }
}
static CURLcode mqtt_verify_connack(struct Curl_easy *data)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result;
char *ptr;
+ DEBUGASSERT(mq);
+ if(!mq)
+ return CURLE_FAILED_INIT;
+
result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
if(result)
goto fail;
char encodedsize[4];
size_t n;
struct connectdata *conn = data->conn;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+
+ if(!mqtt)
+ return CURLE_FAILED_INIT;
result = mqtt_get_topic(data, &topic, &topiclen);
if(result)
goto fail;
- conn->proto.mqtt.packetid++;
+ mqtt->packetid++;
packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
+ 2 bytes topic length + QoS byte */
packet[0] = MQTT_MSG_SUBSCRIBE;
memcpy(&packet[1], encodedsize, n);
- packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
- packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
+ packet[1 + n] = (mqtt->packetid >> 8) & 0xff;
+ packet[2 + n] = mqtt->packetid & 0xff;
packet[3 + n] = (topiclen >> 8) & 0xff;
packet[4 + n ] = topiclen & 0xff;
memcpy(&packet[5 + n], topic, topiclen);
*/
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
struct connectdata *conn = data->conn;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
CURLcode result;
char *ptr;
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
+
result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
if(result)
goto fail;
enum mqttstate nextstate) /* used if state == FIRST */
{
struct connectdata *conn = data->conn;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+ DEBUGASSERT(mqtt);
+ if(!mqtt)
+ return;
#ifdef DEBUGBUILD
infof(data, "%s (from %s) (next is %s)",
statenames[state],
struct connectdata *conn = data->conn;
ssize_t nread;
size_t remlen;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
- struct MQTT *mq = data->req.p.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
unsigned char packet;
+ DEBUGASSERT(mqtt);
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
+
switch(mqtt->state) {
MQTT_SUBACK_COMING:
case MQTT_SUBACK_COMING:
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
*done = FALSE; /* unconditionally */
+ if(!mq)
+ return CURLE_FAILED_INIT;
mq->lastTime = Curl_now();
mq->pingsent = FALSE;
static CURLcode mqtt_done(struct Curl_easy *data,
CURLcode status, bool premature)
{
- struct MQTT *mq = data->req.p.mqtt;
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
(void)status;
(void)premature;
- Curl_dyn_free(&mq->sendbuf);
- Curl_dyn_free(&mq->recvbuf);
+ if(mq) {
+ Curl_dyn_free(&mq->sendbuf);
+ Curl_dyn_free(&mq->recvbuf);
+ }
return CURLE_OK;
}
/* we ping regularly to avoid being disconnected by the server */
static CURLcode mqtt_ping(struct Curl_easy *data)
{
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
struct connectdata *conn = data->conn;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
- struct MQTT *mq = data->req.p.mqtt;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(conn, CURL_META_MQTT_CONN);
+
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
if(mqtt->state == MQTT_FIRST &&
!mq->pingsent &&
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
{
+ struct MQTT *mq = Curl_meta_get(data, CURL_META_MQTT_EASY);
CURLcode result = CURLE_OK;
- struct mqtt_conn *mqtt = &data->conn->proto.mqtt;
- struct MQTT *mq = data->req.p.mqtt;
ssize_t nread;
unsigned char recvbyte;
+ struct mqtt_conn *mqtt = Curl_conn_meta_get(data->conn, CURL_META_MQTT_CONN);
+
+ if(!mqtt || !mq)
+ return CURLE_FAILED_INIT;
*done = FALSE;