}
}
+/** pick a message from the queue, lock and unlock, true if a message */
+static int pick_msg_from_queue(struct dt_msg_queue* mq, void** buf,
+ size_t* len)
+{
+ lock_basic_lock(&mq->lock);
+ if(mq->first) {
+ struct dt_msg_entry* entry = mq->first;
+ mq->first = entry->next;
+ if(!entry->next) mq->last = NULL;
+ mq->cursize -= entry->len;
+ lock_basic_unlock(&mq->lock);
+
+ *buf = entry->buf;
+ *len = entry->len;
+ free(entry);
+ return 1;
+ }
+ lock_basic_unlock(&mq->lock);
+ return 0;
+}
+
+/** find message in queue, false if no message, true if message to send */
+static int dtio_find_in_queue(struct dt_io_thread* dtio,
+ struct dt_msg_queue* mq)
+{
+ void* buf=NULL;
+ size_t len=0;
+ if(pick_msg_from_queue(mq, &buf, &len)) {
+ dtio->cur_msg = buf;
+ dtio->cur_msg_len = len;
+ dtio->cur_msg_done = 0;
+ return 1;
+ }
+ return 0;
+}
+
/** find a new message to write, search message queues, false if none */
static int dtio_find_msg(struct dt_io_thread* dtio)
{
+ struct dt_io_list_item* item = dtio->io_list;
+ while(item) {
+ if(dtio_find_in_queue(dtio, item->queue))
+ return 1;
+ item = item->next;
+ }
+ return 0;
}
/** write more of the current messsage. false if incomplete, true if
* the message is done */
-static int dtio_write_more(struct dt_io_thread* dtio)
+static int dtio_write_more(int fd, struct dt_io_thread* dtio)
{
+ return 0;
}
/** callback for the dnstap events, to write to the output */
/* write it */
if(dtio->cur_msg_done < dtio->cur_msg_len) {
- if(!dtio_write_more(dtio))
+ if(!dtio_write_more(fd, dtio))
return;
}