/* The last #include file should be: */
#include "memdebug.h"
+/* first byte is command.
+ second byte is for flags. */
#define MQTT_MSG_CONNECT 0x10
/* #define MQTT_MSG_CONNACK 0x20 */
#define MQTT_MSG_PUBLISH 0x30
#define MQTT_MSG_SUBSCRIBE 0x82
#define MQTT_MSG_SUBACK 0x90
#define MQTT_MSG_DISCONNECT 0xe0
+#define MQTT_MSG_PINGREQ 0xC0
+#define MQTT_MSG_PINGRESP 0xD0
#define MQTT_CONNACK_LEN 2
#define MQTT_SUBACK_LEN 3
CURLcode result = Curl_xfer_send(data, buf, len, FALSE, &n);
if(result)
return result;
+ mq->lastTime = Curl_now();
Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
if(len != n) {
size_t nsend = len - n;
goto end;
}
+ /* we received something */
+ mq->lastTime = Curl_now();
+
/* if QoS is set, message contains packet id */
result = Curl_client_write(data, CLIENTWRITE_BODY, buffer, nread);
if(result)
static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
{
+ struct MQTT *mq = data->req.p.mqtt;
CURLcode result = CURLE_OK;
*done = FALSE; /* unconditionally */
+ mq->lastTime = Curl_now();
+ mq->pingsent = FALSE;
+
result = mqtt_connect(data);
if(result) {
failf(data, "Error %d sending MQTT CONNECT request", result);
return CURLE_OK;
}
+/* we ping regularly to avoid being disconnected by the server */
+static CURLcode mqtt_ping(struct Curl_easy *data)
+{
+ CURLcode result = CURLE_OK;
+ struct connectdata *conn = data->conn;
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct MQTT *mq = data->req.p.mqtt;
+
+ if(mqtt->state == MQTT_FIRST &&
+ !mq->pingsent &&
+ data->set.upkeep_interval_ms > 0) {
+ struct curltime t = Curl_now();
+ timediff_t diff = Curl_timediff(t, mq->lastTime);
+
+ if(diff > data->set.upkeep_interval_ms) {
+ /* 0xC0 is PINGREQ, and 0x00 is remaining length */
+ unsigned char packet[2] = { 0xC0, 0x00 };
+ size_t packetlen = sizeof(packet);
+
+ result = mqtt_send(data, (char *)packet, packetlen);
+ if(!result) {
+ mq->pingsent = TRUE;
+ }
+ infof(data, "mqtt_ping: sent ping request.");
+ }
+ }
+ return result;
+}
+
static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
{
CURLcode result = CURLE_OK;
return result;
}
+ result = mqtt_ping(data);
+ if(result)
+ return result;
+
infof(data, "mqtt_doing: state [%d]", (int) mqtt->state);
switch(mqtt->state) {
case MQTT_FIRST:
break;
}
Curl_debug(data, CURLINFO_HEADER_IN, (const char *)&mq->firstbyte, 1);
+
+ /* we received something */
+ mq->lastTime = Curl_now();
+
/* remember the first byte */
mq->npacket = 0;
mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
infof(data, "Got DISCONNECT");
*done = TRUE;
}
+
+ /* ping response */
+ if(mq->firstbyte == MQTT_MSG_PINGRESP) {
+ infof(data, "Received ping response.");
+ mq->pingsent = FALSE;
+ mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
+ }
break;
case MQTT_CONNACK:
result = mqtt_verify_connack(data);