size_t ring_max_payload(const struct ring *ring);
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
- ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len));
+ ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len),
+ size_t *processed);
/* returns the ring storage's usable area */
static inline void *ring_area(const struct ring *ring)
* the last known tail pointer will be copied there so that the caller may use
* this to detect new data have arrived since we left the function. Returns 0
* if it needs to pause, 1 once finished.
+ *
+ * If <processed> is not NULL, it will be set to the number of messages
+ * processed by the function (even when the function returns 0)
*/
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
- ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
+ ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len),
+ size_t *processed)
{
size_t head_ofs, tail_ofs, prev_ofs;
size_t ring_size;
struct ist v1, v2;
uint64_t msg_len;
size_t len, cnt;
+ size_t msg_count = 0;
ssize_t copied;
uint8_t readers;
int ret;
break;
}
skip:
+ msg_count += 1;
vp_skip(&v1, &v2, cnt + msg_len);
}
if (last_ofs_ptr)
*last_ofs_ptr = tail_ofs;
*ofs_ptr = head_ofs;
+ if (processed)
+ *processed = msg_count;
return ret;
}
MT_LIST_DELETE(&appctx->wait_entry);
- ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line);
+ ret = ring_dispatch_messages(ring, appctx, &ctx->ofs, &last_ofs, ctx->flags, applet_append_line, NULL);
if (ret && (ctx->flags & RING_WF_WAIT_MODE)) {
/* we've drained everything and are configured to wait for more
MT_LIST_DELETE(&appctx->wait_entry);
- ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, msg_handler);
+ ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0,
+ msg_handler, NULL);
if (ret) {
/* let's be woken up once new data arrive */