size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags);
-ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len);
+ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len);
static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc)
{
*/
int add_to_logformat_list(char *start, char *end, int type, struct list *list_format, char **err);
-ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len);
+ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len);
/*
* Parse the log_format string and fill a linked list.
size_t ring_max_payload(const struct ring *ring);
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
- ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len));
+ ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len));
/* returns the ring storage's area */
static inline void *ring_area(const struct ring *ring)
#include <haproxy/stream.h>
#include <haproxy/task.h>
#include <haproxy/trace.h>
+#include <haproxy/vecpair.h>
#include <haproxy/xref.h>
unsigned int nb_applets = 0;
return ret;
}
-/* Atomically append a line to applet <ctx>'s output, appending a trailing 'LF'.
- * The line is read from <buf> at offset <ofs> relative to the buffer's origin,
- * for <len> bytes. It returns the number of bytes consumed from the input
- * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
- * never be able to (too large msg). The input buffer is not modified. The
- * caller is responsible for making sure that there are at least ofs+len bytes
- * in the input buffer.
+/* Atomically append a line to applet <ctx>'s output, appending a trailing LF.
+ * The line is read from vectors <v1> and <v2> at offset <ofs> relative to the
+ * area's origin, for <len> bytes. It returns the number of bytes consumed from
+ * the input vectors on success, -1 if it temporarily cannot (buffer full), -2
+ * if it will never be able to (too large msg). The vectors are not modified.
+ * The caller is responsible for making sure that there are at least ofs+len
+ * bytes in the input vectors.
*/
-ssize_t applet_append_line(void *ctx, const struct buffer *buf, size_t ofs, size_t len)
+ssize_t applet_append_line(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
{
struct appctx *appctx = ctx;
}
chunk_reset(&trash);
- b_getblk_ofs(buf, trash.area, len, ofs);
+ vp_peek_ofs(v1, v2, ofs, trash.area, len);
trash.data += len;
trash.area[trash.data++] = '\n';
if (applet_putchk(appctx, &trash) == -1)
#include <haproxy/time.h>
#include <haproxy/hash.h>
#include <haproxy/tools.h>
+#include <haproxy/vecpair.h>
/* global recv logs counter */
int cum_log_messages;
};
/* Atomically append an event to applet >ctx>'s output, prepending it with its
- * size in decimal followed by a space.
- * The line is read from <buf> at offset <ofs> relative to the buffer's origin,
- * for <len> bytes. It returns the number of bytes consumed from the input
- * buffer on success, -1 if it temporarily cannot (buffer full), -2 if it will
- * never be able to (too large msg). The input buffer is not modified. The
- * caller is responsible for making sure that there are at least ofs+len bytes
- * in the input buffer.
+ * size in decimal followed by a space. The line is read from vectors <v1> and
+ * <v2> at offset <ofs> relative to the area's origin, for <len> bytes. It
+ * returns the number of bytes consumed from the input vectors on success, -1
+ * if it temporarily cannot (buffer full), -2 if it will never be able to (too
+ * large msg). The input vectors are not modified. The caller is responsible for
+ * making sure that there are at least ofs+len bytes in the input buffer.
*/
-ssize_t syslog_applet_append_event(void *ctx, const struct buffer *buf, size_t ofs, size_t len)
+ssize_t syslog_applet_append_event(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)
{
struct appctx *appctx = ctx;
char *p;
return -2;
/* try to transfer it or report full */
- trash.data += b_getblk_ofs(buf, trash.area + trash.data, len, ofs);
+ trash.data += vp_peek_ofs(v1, v2, ofs, trash.area, len);
if (applet_putchk(appctx, &trash) == -1)
return -1;
HA_RWLOCK_WRLOCK(RING_LOCK, &ring->lock);
if (ofs != ~0) {
/* reader was still attached */
- if (ofs < ring_head(ring))
- ofs += ring_size(ring) - ring_head(ring);
- else
- ofs -= ring_head(ring);
+ char *area = ring_area(ring);
BUG_ON(ofs >= ring_size(ring));
LIST_DEL_INIT(&appctx->wait_entry);
- HA_ATOMIC_DEC(b_peek(&ring->storage->buf, ofs));
+ HA_ATOMIC_DEC(area + ofs);
}
HA_ATOMIC_DEC(&ring->readers_count);
HA_RWLOCK_WRUNLOCK(RING_LOCK, &ring->lock);
* if it needs to pause, 1 once finished.
*/
int ring_dispatch_messages(struct ring *ring, void *ctx, size_t *ofs_ptr, size_t *last_ofs_ptr, uint flags,
- ssize_t (*msg_handler)(void *ctx, const struct buffer *buf, size_t ofs, size_t len))
+ ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
struct buffer *buf = &ring->storage->buf;
+ size_t head_ofs, tail_ofs;
+ size_t ring_size;
+ char *ring_area;
+ struct ist v1, v2;
uint64_t msg_len;
- ssize_t copied;
size_t len, cnt;
- size_t ofs; /* absolute offset from the buffer's origin */
- size_t pos; /* relative position from head (0..data-1) */
+ ssize_t copied;
int ret;
+ ring_area = b_orig(buf);
+ ring_size = b_size(buf);
+
HA_RWLOCK_RDLOCK(RING_LOCK, &ring->lock);
+ head_ofs = b_head_ofs(buf);
+ tail_ofs = b_tail_ofs(buf);
+
/* explanation for the initialization below: it would be better to do
* this in the parsing function but this would occasionally result in
* dropped events because we'd take a reference on the oldest message
* value cannot be produced after initialization.
*/
if (unlikely(*ofs_ptr == ~0)) {
- /* going to the end means looking at tail-1 */
- *ofs_ptr = b_peek_ofs(buf, (flags & RING_WF_SEEK_NEW) ? b_data(buf) - 1 : 0);
- HA_ATOMIC_INC(b_orig(buf) + *ofs_ptr);
+ if (flags & RING_WF_SEEK_NEW) {
+ /* going to the end means looking at tail-1 */
+ head_ofs = tail_ofs + ring_size - 1;
+ if (head_ofs >= ring_size)
+ head_ofs -= ring_size;
+ }
+
+ /* make ctx->ofs relative to the beginning of the buffer now */
+ *ofs_ptr = head_ofs;
+
+ /* and reserve our slot here */
+ HA_ATOMIC_INC(ring_area + head_ofs);
}
- ofs = *ofs_ptr;
- BUG_ON(ofs >= buf->size);
- HA_ATOMIC_DEC(b_orig(buf) + ofs);
+ /* we have the guarantee we can restart from our own head */
+ head_ofs = *ofs_ptr;
+ BUG_ON(head_ofs >= ring_size);
+
+ HA_ATOMIC_DEC(ring_area + head_ofs);
- /* in this loop, ofs always points to the counter byte that precedes
+ /* in this loop, head_ofs always points to the counter byte that precedes
* the message so that we can take our reference there if we have to
- * stop before the end (ret=0).
+ * stop before the end (ret=0). The reference is relative to the ring's
+ * origin, while pos is relative to the ring's head.
*/
ret = 1;
- while (1) {
- /* relative position in the buffer */
- pos = b_rel_ofs(buf, ofs);
+ vp_ring_to_data(&v1, &v2, ring_area, ring_size, head_ofs, tail_ofs);
- if (pos + 1 >= b_data(buf)) {
+ while (1) {
+ if (vp_size(v1, v2) <= 1) {
/* no more data */
break;
}
cnt = 1;
- len = b_peek_varint(buf, pos + cnt, &msg_len);
+ len = vp_peek_varint_ofs(v1, v2, cnt, &msg_len);
if (!len)
break;
cnt += len;
- BUG_ON(msg_len + pos + cnt + 1 > b_data(buf));
- copied = msg_handler(ctx, buf, ofs + cnt, msg_len);
+ BUG_ON(msg_len + cnt + 1 > vp_size(v1, v2));
+
+ copied = msg_handler(ctx, v1, v2, cnt, msg_len);
if (copied == -2) {
/* too large a message to ever fit, let's skip it */
goto skip;
break;
}
skip:
- ofs = b_add_ofs(buf, ofs, cnt + msg_len);
+ vp_skip(&v1, &v2, cnt + msg_len);
}
- HA_ATOMIC_INC(b_orig(buf) + ofs);
+ vp_data_to_ring(v1, v2, ring_area, ring_size, &head_ofs, &tail_ofs);
+
+ HA_ATOMIC_INC(ring_area + head_ofs);
if (last_ofs_ptr)
- *last_ofs_ptr = b_tail_ofs(buf);
- *ofs_ptr = ofs;
+ *last_ofs_ptr = tail_ofs;
+ *ofs_ptr = head_ofs;
HA_RWLOCK_RDUNLOCK(RING_LOCK, &ring->lock);
return ret;
}