]> git.ipfire.org Git - fireperf.git/commitdiff
client: Add a basic implementation
authorMichael Tremer <michael.tremer@ipfire.org>
Mon, 25 Jan 2021 18:06:12 +0000 (18:06 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Mon, 25 Jan 2021 18:06:12 +0000 (18:06 +0000)
This will make the client connect to the server with one or more
connections and it will send data.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/client.c
src/main.c
src/main.h
src/server.c

index 39819600c829f926e05088c27fa4eca71107cced..78509b0eb5d6d85afba327e83ce1afc823dd6d27 100644 (file)
 #                                                                             #
 #############################################################################*/
 
+#include <errno.h>
+#include <sys/epoll.h>
+#include <string.h>
+#include <unistd.h>
+
 #include "client.h"
 #include "logging.h"
 #include "main.h"
 
+static int connect_socket(struct fireperf_config* conf, int fd) {
+       DEBUG(conf, "(Re-)connecting socket %d...\n", fd);
+
+       // Define the peer
+       struct sockaddr_in6 peer = {
+               .sin6_family = AF_INET6,
+               .sin6_addr = conf->address,
+               .sin6_port = htons(conf->port),
+       };
+
+       // Connect to the server
+       int r = connect(fd, &peer, sizeof(peer));
+       if (r && (errno != EINPROGRESS)) {
+               ERROR(conf, "Could not connect to server: %s\n", strerror(errno));
+               return 1;
+       }
+
+       return 0;
+}
+
+static int send_data_to_server(struct fireperf_config* conf, int fd) {
+       char buffer[BUFFER_SIZE];
+       ssize_t bytes_sent;
+
+       DEBUG(conf, "Sending %zu bytes of data to server\n", sizeof(buffer));
+
+       do {
+               bytes_sent = send(fd, buffer, sizeof(buffer), 0);
+       } while (bytes_sent < 0 && (errno == EAGAIN || errno == EWOULDBLOCK));
+
+       DEBUG(conf, "bytes_sent = %zu\n", bytes_sent);
+
+       return 0;
+}
+
 int fireperf_client(struct fireperf_config* conf) {
        DEBUG(conf, "Launching " PACKAGE_NAME " in client mode\n");
 
-       return 0;
+       int r = 1;
+
+       int epollfd = -1;
+       struct epoll_event ev;
+       struct epoll_event events[EPOLL_MAX_EVENTS];
+
+       // Initialize epoll()
+       epollfd = epoll_create1(0);
+       if (epollfd < 0) {
+               ERROR(conf, "Could not initialize epoll(): %s\n", strerror(errno));
+               return 1;
+       }
+
+       ev.events = EPOLLIN|EPOLLOUT;
+
+       DEBUG(conf, "Opening %lu connections...\n", conf->parallel);
+
+       // Open connections
+       for (unsigned int i = 0; i < conf->parallel; i++) {
+               // Open a new socket
+               int fd = socket(AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0);
+               if (fd < 0) {
+                       ERROR(conf, "Could not open socket: %s\n", strerror(errno));
+                       goto ERROR;
+               }
+
+               ev.data.fd = fd;
+
+               if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev)) {
+                       ERROR(conf, "Could not add socket file descriptor to epoll(): %s\n",
+                               strerror(errno));
+                       goto ERROR;
+               }
+       }
+
+       DEBUG(conf, "Entering main loop...\n");
+
+       while (!conf->terminated) {
+               int fds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, -1);
+               if (fds < 1) {
+                       ERROR(conf, "epoll_wait() failed: %s\n", strerror(errno));
+                       goto ERROR;
+               }
+
+               DEBUG(conf, "%d event(s) are ready\n", fds);
+
+               for (int i = 0; i < fds; i++) {
+                       int fd = events[i].data.fd;
+
+                       // What type of event are we handling?
+
+                       // Has the socket been disconnected?
+                       if (events[i].events & EPOLLHUP) {
+                               r = connect_socket(conf, fd);
+                               if (r)
+                                       goto ERROR;
+
+                       } else if (events[i].events & EPOLLOUT) {
+                               r = send_data_to_server(conf, fd);
+                               if (r)
+                                       goto ERROR;
+                       }
+               }
+       }
+
+       // All okay
+       r = 0;
+
+ERROR:
+       if (epollfd > 0)
+               close(epollfd);
+
+       return r;
 }
index 308a242a7180ea80a401c50c48473bfc544e21ca..cc0db573b94387bac7c07e47b1ee762ceba9c270 100644 (file)
@@ -144,6 +144,7 @@ int main(int argc, char* argv[]) {
                .loglevel = DEFAULT_LOG_LEVEL,
                .mode = FIREPERF_MODE_NONE,
                .port = DEFAULT_PORT,
+               .parallel = DEFAULT_PARALLEL,
        };
        int r;
 
index fb67aeb2a44d07aea6a7902ae43ea8581e071a19..bcf70df08a4a403f666953d96c9c602b0f199de1 100644 (file)
 
 #define MAX_PARALLEL (1 << 20)
 
+// Set the size of the read/write buffer to 1 MiB
+#define BUFFER_SIZE      1048576
+
+#define EPOLL_MAX_EVENTS 1024
+
 struct fireperf_config {
        int terminated;
        int loglevel;
index a936a058b1bf2368d92f8e55bba5974331003752..1d62a6c0d6f3847edb828e39ca353a809b821db6 100644 (file)
 #include "main.h"
 #include "server.h"
 
-// Set the size of the read buffer to 1 MiB
-#define BUFFER_SIZE      1048576
-
 #define SOCKET_BACKLOG   1024
-#define EPOLL_MAX_EVENTS 1024
 
 static int create_socket(struct fireperf_config* conf) {
        // Open a new socket