]> git.ipfire.org Git - thirdparty/freeswitch.git/commitdiff
FS-9775: Added support for removing finished transactions via latent purging while...
authorShane Bryldt <astaelan@gmail.com>
Mon, 5 Dec 2016 20:43:52 +0000 (20:43 +0000)
committerMike Jerris <mike@jerris.com>
Wed, 25 Jan 2017 20:59:34 +0000 (14:59 -0600)
Also added support to send error message responses and updated the test to confirm, errors still need to be updated to send an error responses

libs/libks/src/dht/ks_dht-int.h
libs/libks/src/dht/ks_dht.c
libs/libks/src/dht/ks_dht.h
libs/libks/src/dht/ks_dht_message.c
libs/libks/src/dht/ks_dht_transaction.c
libs/libks/test/testdht2.c

index 81ed2f558d3544f7a3af95563f7d5b85af8c0f62..15fcb325485c7538ea3ccf94902c7cfbaf54544a 100644 (file)
@@ -8,12 +8,21 @@ KS_BEGIN_EXTERN_C
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht);
+KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht);
+
 KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr);
+KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
+                                                                                  ks_sockaddr_t *raddr,
+                                                                                  uint8_t *transactionid,
+                                                                                  ks_size_t transactionid_length,
+                                                                                  long long errorcode,
+                                                                                  const char *errorstr);
 
 KS_DECLARE(ks_status_t) ks_dht2_process_query(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
+KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
 
 KS_DECLARE(ks_status_t) ks_dht2_process_query_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
 KS_DECLARE(ks_status_t) ks_dht2_process_response_ping(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message);
index c82b1c49f17b8b74ccaad326f982d2c45567a58a..d0460d54253b0a80b01735b1616872731805e4f2 100644 (file)
@@ -75,10 +75,13 @@ KS_DECLARE(ks_status_t) ks_dht2_init(ks_dht2_t *dht, const ks_dht2_nodeid_raw_t
        ks_hash_create(&dht->registry_type, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
        ks_dht2_register_type(dht, "q", ks_dht2_process_query);
        ks_dht2_register_type(dht, "r", ks_dht2_process_response);
-       // @todo ks_hash_insert the r/e callbacks into type registry
+       ks_dht2_register_type(dht, "e", ks_dht2_process_error);
 
        ks_hash_create(&dht->registry_query, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
        ks_dht2_register_query(dht, "ping", ks_dht2_process_query_ping);
+
+       ks_hash_create(&dht->registry_error, KS_HASH_MODE_DEFAULT, KS_HASH_FLAG_RWLOCK | KS_HASH_FLAG_DUP_CHECK, dht->pool);
+       // @todo register 301 error for internal get/put CAS hash mismatch retry handler
        
     dht->bind_ipv4 = KS_FALSE;
        dht->bind_ipv6 = KS_FALSE;
@@ -111,7 +114,6 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
        dht->recv_buffer_length = 0;
        for (int32_t i = 0; i < dht->endpoints_size; ++i) {
                ks_dht2_endpoint_t *ep = dht->endpoints[i];
-               //ks_hash_remove(dht->endpoints_hash, ep->addr.host);
                ks_dht2_endpoint_deinit(ep);
                ks_dht2_endpoint_free(ep);
        }
@@ -139,6 +141,10 @@ KS_DECLARE(ks_status_t) ks_dht2_deinit(ks_dht2_t *dht)
                ks_hash_destroy(&dht->registry_query);
                dht->registry_query = NULL;
        }
+       if (dht->registry_error) {
+               ks_hash_destroy(&dht->registry_error);
+               dht->registry_error = NULL;
+       }
 
        ks_dht2_nodeid_deinit(&dht->nodeid);
 
@@ -191,6 +197,18 @@ KS_DECLARE(ks_status_t) ks_dht2_register_query(ks_dht2_t *dht, const char *value
        return ks_hash_insert(dht->registry_query, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_register_error(ks_dht2_t *dht, const char *value, ks_dht2_message_callback_t callback)
+{
+       ks_assert(dht);
+       ks_assert(value);
+       ks_assert(callback);
+
+       return ks_hash_insert(dht->registry_error, (void *)value, (void *)(intptr_t)callback) ? KS_STATUS_SUCCESS : KS_STATUS_FAIL;
+}
+
 /**
  *
  */
@@ -297,52 +315,17 @@ KS_DECLARE(ks_status_t) ks_dht2_pulse(ks_dht2_t *dht, int32_t timeout)
                }
        }
 
+       ks_dht2_idle(dht);
+
        return KS_STATUS_SUCCESS;
 }
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
 {
-       // @todo lookup standard def for IPV6 max size
-       char ip[48];
-       ks_dht2_endpoint_t *ep;
-       // @todo calculate max IPV6 payload size?
-       char buf[1000];
-       ks_size_t buf_len;
-       
        ks_assert(dht);
-       ks_assert(raddr);
-       ks_assert(message);
-       ks_assert(message->data);
-
-       // @todo blacklist check
-
-       ks_ip_route(ip, sizeof(ip), raddr->host);
-
-       if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
-               ks_sockaddr_t addr;
-               ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
-               if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
-                       return KS_STATUS_FAIL;
-               }
-       }
-
-       if (!ep) {
-               ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
-               return KS_STATUS_FAIL;
-       }
-
-       buf_len = ben_encode2(buf, sizeof(buf), message->data);
-
-       ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port);
-       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
-       
-       if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) {
-               ks_log(KS_LOG_DEBUG, "Socket error\n");
-               return KS_STATUS_FAIL;
-       }
 
        return KS_STATUS_SUCCESS;
 }
@@ -350,20 +333,49 @@ KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dh
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_maketid(ks_dht2_t *dht)
+KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
 {
        ks_assert(dht);
 
+       if (ks_dht2_idle_expirations(dht) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+       
        return KS_STATUS_SUCCESS;
 }
 
 /**
  *
  */
-KS_DECLARE(ks_status_t) ks_dht2_idle(ks_dht2_t *dht)
+KS_DECLARE(ks_status_t) ks_dht2_idle_expirations(ks_dht2_t *dht)
 {
+       ks_hash_iterator_t *it = NULL;
+       ks_time_t now = ks_time_now_sec();
+       
        ks_assert(dht);
 
+       // @todo add delay between checking expirations, every 10 seconds?
+
+       ks_hash_write_lock(dht->transactions_hash);
+       for (it = ks_hash_first(dht->transactions_hash, KS_UNLOCKED); it; it = ks_hash_next(&it)) {
+               const void *key = NULL;
+               ks_dht2_transaction_t *value = NULL;
+               ks_bool_t remove = KS_FALSE;
+
+               ks_hash_this(it, &key, NULL, (void **)&value);
+               if (value->finished) {
+                       remove = KS_TRUE;
+               } else if (value->expiration <= now) {
+                       ks_log(KS_LOG_DEBUG, "Transaction has expired without response %d\n", value->transactionid);
+                       remove = KS_TRUE;
+               }
+               if (remove) {
+                       ks_hash_remove(dht->transactions_hash, (char *)key);
+                       ks_pool_free(value->pool, value);
+               }
+       }
+       ks_hash_write_unlock(dht->transactions_hash);
+
        return KS_STATUS_SUCCESS;
 }
 
@@ -411,6 +423,96 @@ KS_DECLARE(ks_status_t) ks_dht2_process(ks_dht2_t *dht, ks_sockaddr_t *raddr)
        return ret;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       // @todo lookup standard def for IPV6 max size
+       char ip[48];
+       ks_dht2_endpoint_t *ep;
+       // @todo calculate max IPV6 payload size?
+       char buf[1000];
+       ks_size_t buf_len;
+       
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+       ks_assert(message->data);
+
+       // @todo blacklist check
+
+       ks_ip_route(ip, sizeof(ip), raddr->host);
+
+       if (!(ep = ks_hash_search(dht->endpoints_hash, ip, KS_UNLOCKED)) && dht->autoroute) {
+               ks_sockaddr_t addr;
+               ks_addr_set(&addr, ip, dht->autoroute_port, raddr->family);
+               if (ks_dht2_bind(dht, &addr, &ep) != KS_STATUS_SUCCESS) {
+                       return KS_STATUS_FAIL;
+               }
+       }
+
+       if (!ep) {
+               ks_log(KS_LOG_DEBUG, "No route available to %s\n", raddr->host);
+               return KS_STATUS_FAIL;
+       }
+
+       buf_len = ben_encode2(buf, sizeof(buf), message->data);
+
+       ks_log(KS_LOG_DEBUG, "Sending message to %s %d\n", raddr->host, raddr->port);
+       ks_log(KS_LOG_DEBUG, "%s\n", ben_print(message->data));
+       
+       if (ks_socket_sendto(ep->sock, (void *)buf, &buf_len, raddr) != KS_STATUS_SUCCESS) {
+               ks_log(KS_LOG_DEBUG, "Socket error\n");
+               return KS_STATUS_FAIL;
+       }
+
+       return KS_STATUS_SUCCESS;
+}
+
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_send_error(ks_dht2_t *dht,
+                                                                                  ks_sockaddr_t *raddr,
+                                                                                  uint8_t *transactionid,
+                                                                                  ks_size_t transactionid_length,
+                                                                                  long long errorcode,
+                                                                                  const char *errorstr)
+{
+       ks_dht2_message_t error;
+       struct bencode *e;
+       ks_status_t ret = KS_STATUS_FAIL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(transactionid);
+       ks_assert(errorstr);
+
+       if (ks_dht2_message_prealloc(&error, dht->pool) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
+       if (ks_dht2_message_init(&error, KS_TRUE) != KS_STATUS_SUCCESS) {
+               return KS_STATUS_FAIL;
+       }
+
+       if (ks_dht2_message_error(&error, transactionid, transactionid_length, &e) != KS_STATUS_SUCCESS) {
+               goto done;
+       }
+
+       // @note e joins response.data and will be freed with it
+       ben_list_append(e, ben_int(errorcode));
+       ben_list_append(e, ben_blob(errorstr, strlen(errorstr)));
+
+       ks_log(KS_LOG_DEBUG, "Sending message error %d\n", errorcode);
+       ret = ks_dht2_send(dht, raddr, &error);
+
+ done:
+       ks_dht2_message_deinit(&error);
+       return ret;
+}
+
 /**
  *
  */
@@ -497,13 +599,96 @@ KS_DECLARE(ks_status_t) ks_dht2_process_response(ks_dht2_t *dht, ks_sockaddr_t *
        
        if (!transaction) {
                ks_log(KS_LOG_DEBUG, "Message response rejected with unknown transaction id %d\n", transactionid);
+       } else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
+               ks_log(KS_LOG_DEBUG,
+                          "Message response rejected due to spoofing from %s %d, expected %s %d\n",
+                          raddr->host,
+                          raddr->port,
+                          transaction->raddr.host,
+                          transaction->raddr.port);
        } else {
+               // @todo mark transaction for later removal
+               transaction->finished = KS_TRUE;
                ret = transaction->callback(dht, raddr, message);
        }
 
        return ret;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_process_error(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message_t *message)
+{
+       struct bencode *e;
+       struct bencode *ec;
+       struct bencode *es;
+       const char *et;
+       ks_size_t es_len;
+       long long errorcode;
+       char error[KS_DHT_MESSAGE_ERROR_MAX_SIZE];
+       ks_dht2_transaction_t *transaction;
+       uint32_t *tid;
+       uint32_t transactionid;
+       ks_status_t ret = KS_STATUS_FAIL;
+
+       ks_assert(dht);
+       ks_assert(raddr);
+       ks_assert(message);
+
+       // @todo start of ks_dht2_message_parse_error
+       e = ben_dict_get_by_str(message->data, "e");
+       if (!e) {
+               ks_log(KS_LOG_DEBUG, "Message error missing required key 'e'\n");
+               return KS_STATUS_FAIL;
+       }
+       ec = ben_list_get(e, 0);
+       es = ben_list_get(e, 1);
+       es_len = ben_str_len(es);
+       if (es_len >= KS_DHT_MESSAGE_ERROR_MAX_SIZE) {
+               ks_log(KS_LOG_DEBUG, "Message error value has an unexpectedly large size of %d\n", es_len);
+               return KS_STATUS_FAIL;
+       }
+       errorcode = ben_int_val(ec);
+       et = ben_str_val(es);
+       
+       memcpy(error, et, es_len);
+       error[es_len] = '\0';
+       // todo end of ks_dht2_message_parse_error
+
+       message->args = e;
+
+       tid = (uint32_t *)message->transactionid;
+       transactionid = ntohl(*tid);
+
+       transaction = ks_hash_search(dht->transactions_hash, (void *)&transactionid, KS_READLOCKED);
+       ks_hash_read_unlock(dht->transactions_hash);
+       
+       if (!transaction) {
+               ks_log(KS_LOG_DEBUG, "Message error rejected with unknown transaction id %d\n", transactionid);
+       } else if (!ks_addr_cmp(raddr, &transaction->raddr)) {
+               ks_log(KS_LOG_DEBUG,
+                          "Message error rejected due to spoofing from %s %d, expected %s %d\n",
+                          raddr->host,
+                          raddr->port,
+                          transaction->raddr.host,
+                          transaction->raddr.port);
+       } else {
+               // @todo mark transaction for later removal
+               ks_dht2_message_callback_t callback;
+               transaction->finished = KS_TRUE;
+
+               if ((callback = (ks_dht2_message_callback_t)(intptr_t)ks_hash_search(dht->registry_error, error, KS_UNLOCKED))) {
+                       ret = callback(dht, raddr, message);
+               } else {
+                       ks_log(KS_LOG_DEBUG, "Message error received for transaction id %d, error %d: %s\n", transactionid, errorcode, error);
+                       ret = KS_STATUS_SUCCESS;
+               }
+       }
+
+       return ret;
+}
+
 /**
  *
  */
@@ -585,7 +770,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_query_ping(ks_dht2_t *dht, ks_sockaddr_t *r
                goto done;
        }
 
-       if (ks_dht2_transaction_init(transaction, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
+       if (ks_dht2_transaction_init(transaction, raddr, transactionid, ks_dht2_process_response_ping) != KS_STATUS_SUCCESS) {
                goto done;
        }
 
@@ -634,6 +819,7 @@ KS_DECLARE(ks_status_t) ks_dht2_send_response_ping(ks_dht2_t *dht,
 
        ks_assert(dht);
        ks_assert(raddr);
+       ks_assert(transactionid);
 
        if (ks_dht2_message_prealloc(&response, dht->pool) != KS_STATUS_SUCCESS) {
                return KS_STATUS_FAIL;
index b6ed56c3ae502af46ce2ae5186fda0a603cefe7a..8985387ef144664802fba3a41104eefb7f033282 100644 (file)
@@ -16,7 +16,9 @@ KS_BEGIN_EXTERN_C
 #define KS_DHT_MESSAGE_TRANSACTIONID_MAX_SIZE 20
 #define KS_DHT_MESSAGE_TYPE_MAX_SIZE 20
 #define KS_DHT_MESSAGE_QUERY_MAX_SIZE 20
+#define KS_DHT_MESSAGE_ERROR_MAX_SIZE 256
 
+#define KS_DHT_TRANSACTION_EXPIRATION_DELAY 30
 
 typedef struct ks_dht2_s ks_dht2_t;
 typedef struct ks_dht2_nodeid_s ks_dht2_nodeid_t;
@@ -54,9 +56,11 @@ struct ks_dht2_endpoint_s {
 
 struct ks_dht2_transaction_s {
        ks_pool_t *pool;
+       ks_sockaddr_t raddr;
        uint32_t transactionid;
        ks_dht2_message_callback_t callback;
-       // @todo expiration data
+       ks_time_t expiration;
+       ks_bool_t finished;
 };
 
 
@@ -71,6 +75,7 @@ struct ks_dht2_s {
 
        ks_hash_t *registry_type;
        ks_hash_t *registry_query;
+       ks_hash_t *registry_error;
 
        ks_bool_t bind_ipv4;
        ks_bool_t bind_ipv6;
@@ -137,6 +142,10 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message,
                                                                                                 uint8_t *transactionid,
                                                                                                 ks_size_t transactionid_length,
                                                                                                 struct bencode **args);
+KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message,
+                                                                                         uint8_t *transactionid,
+                                                                                         ks_size_t transactionid_length,
+                                                                                         struct bencode **args);
 
 /**
  *
@@ -150,6 +159,7 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_prealloc(ks_dht2_transaction_t *tras
 KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transaction);
 
 KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction,
+                                                                                                ks_sockaddr_t *raddr,
                                                                                                 uint32_t transactionid,
                                                                                                 ks_dht2_message_callback_t callback);
 KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transaction);
index a84eb60e770df0c60cd32a09010ff3880d4dc870..ed88a0fe638a3d747670184be85557fc3fcb50ce 100644 (file)
@@ -207,6 +207,33 @@ KS_DECLARE(ks_status_t) ks_dht2_message_response(ks_dht2_message_t *message,
        return KS_STATUS_SUCCESS;
 }
 
+/**
+ *
+ */
+KS_DECLARE(ks_status_t) ks_dht2_message_error(ks_dht2_message_t *message,
+                                                                                         uint8_t *transactionid,
+                                                                                         ks_size_t transactionid_length,
+                                                                                         struct bencode **args)
+{
+       struct bencode *e;
+       
+       ks_assert(message);
+       ks_assert(transactionid);
+       
+    ben_dict_set(message->data, ben_blob("t", 1), ben_blob(transactionid, transactionid_length));
+       ben_dict_set(message->data, ben_blob("y", 1), ben_blob("e", 1));
+       
+       // @note r joins message->data and will be freed with it
+       e = ben_list();
+       ben_dict_set(message->data, ben_blob("e", 1), e);
+
+       if (args) {
+               *args = e;
+       }
+
+       return KS_STATUS_SUCCESS;
+}
+
 
 /* For Emacs:
  * Local Variables:
index 9cfe88343387204196f8cfb49b84512bc11f64f0..3b62f8eaf373ff4c1cdca5b9e46f18f5d84d715a 100644 (file)
@@ -48,15 +48,20 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_free(ks_dht2_transaction_t *transact
  *
  */
 KS_DECLARE(ks_status_t) ks_dht2_transaction_init(ks_dht2_transaction_t *transaction,
+                                                                                                ks_sockaddr_t *raddr,
                                                                                                 uint32_t transactionid,
                                                                                                 ks_dht2_message_callback_t callback)
 {
        ks_assert(transaction);
+       ks_assert(raddr);
        ks_assert(transaction->pool);
        ks_assert(callback);
 
+       transaction->raddr = *raddr;
        transaction->transactionid = transactionid;
        transaction->callback = callback;
+       transaction->expiration = ks_time_now_sec() + KS_DHT_TRANSACTION_EXPIRATION_DELAY;
+       transaction->finished = KS_FALSE;
 
        return KS_STATUS_SUCCESS;
 }
@@ -68,8 +73,11 @@ KS_DECLARE(ks_status_t) ks_dht2_transaction_deinit(ks_dht2_transaction_t *transa
 {
        ks_assert(transaction);
 
+       transaction->raddr = (const ks_sockaddr_t){ 0 };
        transaction->transactionid = 0;
        transaction->callback = NULL;
+       transaction->expiration = 0;
+       transaction->finished = KS_FALSE;
 
        return KS_STATUS_SUCCESS;
 }
index 482449c6895010b9a3f7f6e9fb55587aaa3cfa29..f54b7765118bf8faa515f1af000e75960452f3e1 100644 (file)
@@ -10,6 +10,7 @@ ks_status_t dht_z_callback(ks_dht2_t *dht, ks_sockaddr_t *raddr, ks_dht2_message
 {
        diag("dht_z_callback\n");
        ok(message->transactionid[0] == '4' && message->transactionid[1] == '2');
+       ks_dht2_send_error(dht, raddr, message->transactionid, message->transactionid_length, 201, "Generic test error");
        return KS_STATUS_SUCCESS;
 }
 
@@ -97,6 +98,8 @@ int main() {
   err = ks_dht2_process(dht1, &raddr);
   ok(err == KS_STATUS_SUCCESS);
 
+  err = ks_dht2_pulse(&dht2, 1000);
+  ok(err == KS_STATUS_SUCCESS);
 
   //buflen = strlen(TEST_DHT1_PROCESS_QUERY_PING_BUFFER);
   //memcpy(dht1->recv_buffer, TEST_DHT1_PROCESS_QUERY_PING_BUFFER, buflen);