#include "logging.h"
#include "main.h"
#include "random.h"
+#include "stats.h"
static int parse_address(const char* string, struct in6_addr* address6) {
struct in_addr address4;
#include "client.h"
#include "main.h"
#include "logging.h"
-#include "random.h"
#include "server.h"
#include "stats.h"
return 0;
}
-
-static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
-
-int handle_connection_send(struct fireperf_ctx* ctx,
- struct fireperf_stats* stats, int fd) {
- const char* buffer = ZERO;
- ssize_t bytes_sent;
-
- if (!ctx->zero) {
- buffer = fireperf_random_pool_get_slice(ctx, SOCKET_SEND_BUFFER_SIZE);
- }
-
- do {
- bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY);
- } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
-
- // Update statistics
- stats->bytes_sent += bytes_sent;
- stats->total_bytes_sent += bytes_sent;
-
- return 0;
-}
-
-int handle_connection_recv(struct fireperf_ctx* ctx,
- struct fireperf_stats* stats, int fd) {
- char buffer[SOCKET_RECV_BUFFER_SIZE];
- ssize_t bytes_read;
-
- // Try reading into buffer
- do {
- bytes_read = recv(fd, buffer, sizeof(buffer), 0);
- } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
-
- // Error?
- if (bytes_read < 0) {
- ERROR(ctx, "Could not read from socket %d: %s\n", fd, strerror(errno));
- return -1;
- }
-
- DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
-
- // Update statistics
- stats->bytes_received += bytes_read;
- stats->total_bytes_received += bytes_read;
-
- return 0;
-}
#define FIREPERF_MAIN_H
#include "ctx.h"
-#include "stats.h"
int set_socket_buffer_sizes(struct fireperf_ctx* ctx, int fd);
-int handle_connection_send(struct fireperf_ctx* ctx,
- struct fireperf_stats* stats, int fd);
-int handle_connection_recv(struct fireperf_ctx* ctx,
- struct fireperf_stats* stats, int fd);
-
#endif /* FIREPERF_MAIN_H */
#include "ctx.h"
#include "main.h"
#include "logging.h"
+#include "random.h"
+#include "stats.h"
#include "worker.h"
+static const char ZERO[SOCKET_SEND_BUFFER_SIZE] = { 0 };
+
struct fireperf_worker {
pthread_t thread;
free(worker);
}
+static int fireperf_worker_send(struct fireperf_worker* worker, int fd) {
+ const char* buffer = ZERO;
+ ssize_t bytes_sent;
+
+ // Fetch some random data if requested
+ if (!worker->ctx->zero)
+ buffer = fireperf_random_pool_get_slice(worker->ctx, SOCKET_SEND_BUFFER_SIZE);
+
+ // Send the buffer
+ do {
+ bytes_sent = send(fd, buffer, SOCKET_SEND_BUFFER_SIZE, MSG_ZEROCOPY);
+ } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+
+ // Update statistics
+ worker->stats.bytes_sent += bytes_sent;
+ worker->stats.total_bytes_sent += bytes_sent;
+
+ return 0;
+}
+
+static int fireperf_worker_recv(struct fireperf_worker* worker, int fd) {
+ char buffer[SOCKET_RECV_BUFFER_SIZE];
+ ssize_t bytes_read;
+
+ // Try reading into buffer
+ do {
+ bytes_read = recv(fd, buffer, sizeof(buffer), 0);
+ } while (bytes_read < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+
+ // Error?
+ if (bytes_read < 0) {
+ ERROR(worker->ctx, "Could not read from socket %d: %m\n", fd);
+ return -errno;
+ }
+
+ //DEBUG(ctx, "Read %zu bytes from socket %d\n", bytes_read, fd);
+
+ // Update statistics
+ worker->stats.bytes_received += bytes_read;
+ worker->stats.total_bytes_received += bytes_read;
+
+ return 0;
+}
+
/*
The main function of the worker.
*/
// Handle incoming data
if (events[i].events & EPOLLIN) {
- r = handle_connection_recv(worker->ctx, &worker->stats, fd);
+ r = fireperf_worker_recv(worker, fd);
if (r < 0)
goto ERROR;
}
// Handle outgoing data
if (events[i].events & EPOLLOUT) {
- r = handle_connection_send(worker->ctx, &worker->stats, fd);
+ r = fireperf_worker_send(worker, fd);
if (r < 0)
goto ERROR;
}