#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#include <limits.h>
#include <assert.h>
+#include <poll.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <sys/types.h>
int epoll_fd;
bool cgroup_failed;
pthread_t pthread;
+ char *buf;
};
#define LOOPS_DEFAULT 1000000
static bool threaded;
static bool nonblocking;
+static unsigned int write_size = sizeof(int);
static char *cgrp_names[2];
static struct cgroup *cgrps[2];
OPT_BOOLEAN('n', "nonblocking", &nonblocking, "Use non-blocking operations"),
OPT_INTEGER('l', "loop", &loops, "Specify number of loops"),
OPT_BOOLEAN('T', "threaded", &threaded, "Specify threads/process based task setup"),
+ OPT_UINTEGER('s', "write-size", &write_size,
+ "Bytes per ping-pong write (default 4-bytes). Use larger values to exercise the pipe page-allocation path."),
OPT_CALLBACK('G', "cgroups", NULL, "SEND,RECV",
"Put sender and receivers in given cgroups",
parse_two_cgroups),
free(cgrp_names[nr]);
}
+/* Sleep until @fd is writable, so we don't busy-spin on EWOULDBLOCK. */
+static inline void wait_writable(int fd)
+{
+ struct pollfd pfd = {
+ .fd = fd,
+ .events = POLLOUT,
+ };
+
+ poll(&pfd, 1, -1);
+}
+
+/*
+ * Loop on short read()/write(): the kernel may return fewer bytes than
+ * requested, retry on EINTR, and in non-blocking mode wait via poll()
+ * when the writer transiently hits EWOULDBLOCK while the peer is still
+ * draining a full pipe (capacity is sized to write_size).
+ */
+static inline int write_pipe(struct thread_data *td)
+{
+ unsigned int done = 0;
+ int ret;
+
+ while (done < write_size) {
+ ret = write(td->pipe_write, td->buf + done, write_size - done);
+ if (ret < 0) {
+ if (errno == EINTR)
+ continue;
+ if (nonblocking && errno == EWOULDBLOCK) {
+ wait_writable(td->pipe_write);
+ continue;
+ }
+ return ret;
+ }
+ done += ret;
+ }
+ return done;
+}
+
static inline int read_pipe(struct thread_data *td)
{
- int ret, m;
-retry:
- if (nonblocking) {
- ret = epoll_wait(td->epoll_fd, &td->epoll_ev, 1, -1);
- if (ret < 0)
+ unsigned int done = 0;
+ int ret;
+
+ while (done < write_size) {
+ if (nonblocking) {
+ ret = epoll_wait(td->epoll_fd, &td->epoll_ev, 1, -1);
+ if (ret < 0) {
+ if (errno == EINTR)
+ continue;
+ return ret;
+ }
+ }
+ ret = read(td->pipe_read, td->buf + done, write_size - done);
+ if (ret < 0) {
+ if (errno == EINTR)
+ continue;
+ if (nonblocking && errno == EWOULDBLOCK)
+ continue;
return ret;
+ }
+ if (ret == 0)
+ return done;
+ done += ret;
}
- ret = read(td->pipe_read, &m, sizeof(int));
- if (nonblocking && ret < 0 && errno == EWOULDBLOCK)
- goto retry;
- return ret;
+ return done;
}
static void *worker_thread(void *__tdata)
{
struct thread_data *td = __tdata;
- int i, ret, m = 0;
+ int i, ret;
ret = enter_cgroup(td->nr);
if (ret < 0) {
}
for (i = 0; i < loops; i++) {
- ret = write(td->pipe_write, &m, sizeof(int));
- BUG_ON(ret != sizeof(int));
+ ret = write_pipe(td);
+ BUG_ON(ret != (int)write_size);
ret = read_pipe(td);
- BUG_ON(ret != sizeof(int));
+ BUG_ON(ret != (int)write_size);
}
return NULL;
}
+/*
+ * On a custom write_size, resize the pipes so a single payload fits.
+ */
+static int resize_pipes(int wfd1, int wfd2)
+{
+ int r1, r2;
+
+ if (write_size <= sizeof(int))
+ return 0;
+
+ r1 = fcntl(wfd1, F_SETPIPE_SZ, write_size);
+ r2 = fcntl(wfd2, F_SETPIPE_SZ, write_size);
+ if (r1 < 0 || r2 < 0 ||
+ (unsigned int)r1 < write_size ||
+ (unsigned int)r2 < write_size) {
+ fprintf(stderr,
+ "--write-size %u exceeds /proc/sys/fs/pipe-max-size\n",
+ write_size);
+ return -1;
+ }
+ return 0;
+}
+
int bench_sched_pipe(int argc, const char **argv)
{
struct thread_data threads[2] = {};
argc = parse_options(argc, argv, options, bench_sched_pipe_usage, 0);
+ /*
+ * The error paths below return early without closing the pipes or
+ * freeing the cgroup state. That is fine: bench_sched_pipe() runs
+ * once and the process exits right after it returns, so these are
+ * not real leaks.
+ */
+ if (write_size == 0 || write_size > INT_MAX) {
+ fprintf(stderr, "--write-size must be in 1..%d\n", INT_MAX);
+ return -1;
+ }
+
if (nonblocking)
flags |= O_NONBLOCK;
BUG_ON(pipe2(pipe_1, flags));
BUG_ON(pipe2(pipe_2, flags));
+ if (resize_pipes(pipe_1[1], pipe_2[1]) < 0)
+ return -1;
+
+ for (t = 0; t < nr_threads; t++) {
+ threads[t].buf = calloc(1, write_size);
+ BUG_ON(!threads[t].buf);
+ }
+
gettimeofday(&start, NULL);
for (t = 0; t < nr_threads; t++) {
gettimeofday(&stop, NULL);
timersub(&stop, &start, &diff);
+ for (t = 0; t < nr_threads; t++)
+ free(threads[t].buf);
+
exit_cgroup(0);
exit_cgroup(1);