#include <sys/un.h>
#endif
+void* fstrm_create_control_frame_start(char* contenttype, size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* start framestream message:
+ * 4byte 0: control indicator.
+ * 4byte bigendian: length of control frame
+ * 4byte bigendian: type START
+ * 4byte bigendian: frame option: content-type
+ * 4byte bigendian: length of string
+ * string of content type (dnstap)
+ */
+ n = 4+4+4+4+4+strlen(contenttype);
+ control = malloc(n);
+ if(!control)
+ return NULL;
+ control[0] = 0;
+ control[1] = htonl(4+4+4+strlen(contenttype));
+ control[2] = htonl(FSTRM_CONTROL_FRAME_START);
+ control[3] = htonl(FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE);
+ control[4] = htonl(strlen(contenttype));
+ memmove(&control[5], contenttype, strlen(contenttype));
+ *len = n;
+ return control;
+}
+
+void* fstrm_create_control_frame_stop(size_t* len)
+{
+ uint32_t* control;
+ size_t n;
+ /* stop framestream message:
+ * 4byte 0: control indicator.
+ * 4byte bigendian: length of control frame
+ * 4byte bigendian: type STOP
+ */
+ n = 4+4+4;
+ control = malloc(n);
+ if(!control)
+ return NULL;
+ control[0] = 0;
+ control[1] = htonl(4);
+ control[2] = htonl(FSTRM_CONTROL_FRAME_STOP);
+ *len = n;
+ return control;
+}
+
struct dt_msg_queue*
dt_msg_queue_create(void)
{
}
}
-/** pick a message from the queue, lock and unlock, true if a message */
-static int pick_msg_from_queue(struct dt_msg_queue* mq, void** buf,
+/** pick a message from the queue, the routine locks and unlocks,
+ * returns true if there is a message */
+static int dt_msg_queue_pop(struct dt_msg_queue* mq, void** buf,
size_t* len)
{
lock_basic_lock(&mq->lock);
{
void* buf=NULL;
size_t len=0;
- if(pick_msg_from_queue(mq, &buf, &len)) {
+ if(dt_msg_queue_pop(mq, &buf, &len)) {
dtio->cur_msg = buf;
dtio->cur_msg_len = len;
dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 0;
return 1;
}
return 0;
dtio->cur_msg = NULL;
dtio->cur_msg_len = 0;
dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 0;
}
/** callback for the dnstap commandpipe, to stop the dnstap IO */
ub_event_base_free(dtio->event_base);
}
+/** setup a start control message */
+static int dtio_control_start_send(struct dt_io_thread* dtio)
+{
+ log_assert(dtio->cur_msg == NULL && dtio->cur_msg_len == 0);
+ dtio->cur_msg = fstrm_create_control_frame_start(DNSTAP_CONTENT_TYPE,
+ &dtio->cur_msg_len);
+ if(!dtio->cur_msg) {
+ return 0;
+ }
+ /* setup to send the control message */
+ /* set that the buffer needs to be sent, but the length
+ * of that buffer is already written, that way the buffer can
+ * start with 0 length and then the length of the control frame in it*/
+ dtio->cur_msg_done = 0;
+ dtio->cur_msg_len_done = 4;
+ return 1;
+}
+
/** open the output file descriptor */
static void dtio_open_output(struct dt_io_thread* dtio)
{
}
dtio->event = ev;
+ /* setup protocol control message to start */
+ if(!dtio_control_start_send(dtio)) {
+ fatal_exit("dnstap io: out of memory");
+ }
}
/** add the output file descriptor event for listening */
size_t cur_msg_len;
/** number of bytes written for the current message */
size_t cur_msg_done;
+ /** number of bytes of the length that have been written,
+ * for the current message length that precedes the frame */
+ size_t cur_msg_len_done;
/** command pipe that stops the pipe if closed. Used to quit
* the program. [0] is read, [1] is written to. */
/** the constant that denotes the control field type that is the
* string for the content type of the stream. */
#define FSTRM_CONTROL_FIELD_TYPE_CONTENT_TYPE 0x01
+/** the content type for DNSTAP frame streams */
+#define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
-/* routine to send START message, with content type. */
-/* routine to send a frame. */
-/* routine to send STOP message. */
+/**
+ * routine for START message, with content type.
+ * This creates an FSTRM control frame of type START.
+ * @param contenttype: a zero delimited string with the content type.
+ * eg. DNSTAP_CONTENT_TYPE, "protobuf:dnstap.Dnstap"
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4byte 0 that indicates
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable, but straight the 0 start zeroes, and then the
+ * length of the control frame itself is embedded next in the buffer,
+ * with the control frame after that in the buffer.
+ */
+void* fstrm_create_control_frame_start(char* contenttype, size_t* len);
+/**
+ * routine for STOP message.
+ * This creates an FSTRM control frame of type STOP.
+ * @param len: if a buffer is returned this is the length of that buffer.
+ * @return NULL on malloc failure. Returns a malloced buffer with the
+ * protocol message. The buffer starts with the 4byte 0 that indicates
+ * a control frame. The buffer should be sent without preceding it with
+ * the 'len' variable, but straight the 0 start zeroes, and then the
+ * length of the control frame itself is embedded next in the buffer,
+ * with the control frame after that in the buffer.
+ */
+void* fstrm_create_control_frame_stop(size_t* len);
/**
* Create new (empty) worker message queue. Limit set to default on max.