ssize_t wire_buf_end_idx; /**< Data end offset in wire_buf. */
uint64_t last_activity; /**< Time of last IO activity (if any occurs).
* Otherwise session creation time. */
+ size_t write_queue_size; /**< number of buffered write requests (by our code) */
};
static void on_session_close(uv_handle_t *handle)
session->wire_buf_end_idx = 0;
return NULL;
}
-
+
if (session->wire_buf_start_idx > session->wire_buf_end_idx) {
session->sflags.wirebuf_error = true;
session->wire_buf_start_idx = 0;
uint8_t *msg_start = &session->wire_buf[session->wire_buf_start_idx];
ssize_t wirebuf_msg_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
uint16_t msg_size = 0;
-
+
if (!handle) {
session->sflags.wirebuf_error = true;
return NULL;
session->wire_buf_start_idx += pkt_msg_size;
}
session->sflags.wirebuf_error = false;
-
+
wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
if (wirebuf_data_size == 0) {
session_wirebuf_discard(session);
{
return s->last_activity;
}
+
+bool session_write_queue_is_empty(struct session *session)
+{
+ return session->write_queue_size == 0;
+}
+
+int session_write_queue_inc(struct session *session)
+{
+ if (session->write_queue_size == SIZE_MAX) {
+ return kr_error(ERANGE);
+ }
+ session->write_queue_size += 1;
+ return kr_ok();
+}
+
+int session_write_queue_dec(struct session *session)
+{
+ if (session->write_queue_size == 0) {
+ return kr_error(ERANGE);
+ }
+ session->write_queue_size -= 1;
+ return kr_ok();
+}
/** Returns either creation time or time of last IO activity if any occurs. */
/* Used for TCP timeout calculation. */
uint64_t session_last_activity(struct session *s);
+
+/** Check whether the write queue is empty. */
+bool session_write_queue_is_empty(struct session *session);
+/** Increment the write queue size. */
+int session_write_queue_inc(struct session *session);
+/** Decrement the write queue size. */
+int session_write_queue_dec(struct session *session);