]> git.ipfire.org Git - thirdparty/strongswan.git/commitdiff
load-tester: use a stream service to dispatch control connections
authorMartin Willi <martin@revosec.ch>
Mon, 1 Jul 2013 10:18:15 +0000 (12:18 +0200)
committerMartin Willi <martin@revosec.ch>
Thu, 18 Jul 2013 14:00:29 +0000 (16:00 +0200)
src/libcharon/plugins/load_tester/load_tester.c
src/libcharon/plugins/load_tester/load_tester_control.c

index f7361e6068f8079d11242889763e9742f6f7c378..b7b971ee8445b2b8f8a580ba942dac1b7bc9fd53 100644 (file)
@@ -35,7 +35,7 @@ static FILE* make_connection()
        addr.sun_family = AF_UNIX;
        strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
index 3c82b5c301f48daa276bf74a32624f73141a0f33..f9ec9142f1bd1816ecdccc8968b35c53a6a0f368 100644 (file)
@@ -43,9 +43,9 @@ struct private_load_tester_control_t {
        load_tester_control_t public;
 
        /**
-        * Load tester unix socket file descriptor
+        * Load tester control stream service
         */
-       int socket;
+       stream_service_t *service;
 };
 
 /**
@@ -84,48 +84,6 @@ struct init_listener_t {
        condvar_t *condvar;
 };
 
-/**
- * Open load-tester listening socket
- */
-static bool open_socket(private_load_tester_control_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, LOAD_TESTER_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating load-tester socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding load-tester socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing load-tester socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "listening on load-tester socket failed: %s", strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
 /**
  * Hashtable hash function
  */
@@ -215,9 +173,9 @@ static bool initiate_cb(init_listener_t *this, debug_t group, level_t level,
 }
 
 /**
- * Initiate load-test, write progress to stream
+ * Accept connections, initiate load-test, write progress to stream
  */
-static job_requeue_t initiate(FILE *stream)
+static bool on_accept(private_load_tester_control_t *this, stream_t *io)
 {
        init_listener_t *listener;
        enumerator_t *enumerator;
@@ -225,15 +183,23 @@ static job_requeue_t initiate(FILE *stream)
        child_cfg_t *child_cfg;
        u_int i, count, failed = 0, delay = 0;
        char buf[16] = "";
+       FILE *stream;
 
+       stream = io->get_file(io);
+       if (!stream)
+       {
+               return FALSE;
+       }
        fflush(stream);
        if (fgets(buf, sizeof(buf), stream) == NULL)
        {
-               return JOB_REQUEUE_NONE;
+               fclose(stream);
+               return FALSE;
        }
        if (sscanf(buf, "%u %u", &count, &delay) < 1)
        {
-               return JOB_REQUEUE_NONE;
+               fclose(stream);
+               return FALSE;
        }
 
        INIT(listener,
@@ -308,50 +274,15 @@ static job_requeue_t initiate(FILE *stream)
        free(listener);
 
        fprintf(stream, "\n");
+       fclose(stream);
 
-       return JOB_REQUEUE_NONE;
-}
-
-/**
- * Accept load-tester control connections, dispatch
- */
-static job_requeue_t receive(private_load_tester_control_t *this)
-{
-       struct sockaddr_un addr;
-       int fd, len = sizeof(addr);
-       bool oldstate;
-       FILE *stream;
-
-       oldstate = thread_cancelability(TRUE);
-       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-       thread_cancelability(oldstate);
-
-       if (fd != -1)
-       {
-               stream = fdopen(fd, "r+");
-               if (stream)
-               {
-                       DBG1(DBG_CFG, "client connected");
-                       lib->processor->queue_job(lib->processor,
-                               (job_t*)callback_job_create_with_prio(
-                                       (callback_job_cb_t)initiate, stream, (void*)fclose,
-                                       (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
-               }
-               else
-               {
-                       close(fd);
-               }
-       }
-       return JOB_REQUEUE_FAIR;
+       return FALSE;
 }
 
 METHOD(load_tester_control_t, destroy, void,
        private_load_tester_control_t *this)
 {
-       if (this->socket != -1)
-       {
-               close(this->socket);
-       }
+       DESTROY_IF(this->service);
        free(this);
 }
 
@@ -361,6 +292,7 @@ METHOD(load_tester_control_t, destroy, void,
 load_tester_control_t *load_tester_control_create()
 {
        private_load_tester_control_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
@@ -368,16 +300,18 @@ load_tester_control_t *load_tester_control_create()
                },
        );
 
-       if (open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                               "%s.plugins.load-tester.socket", "unix://" LOAD_TESTER_SOCKET,
+                               charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (this->service)
        {
-               lib->processor->queue_job(lib->processor, (job_t*)
-                       callback_job_create_with_prio((callback_job_cb_t)receive, this, NULL,
-                                               (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+               this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                                this, JOB_PRIO_CRITICAL, 0);
        }
        else
        {
-               this->socket = -1;
+               DBG1(DBG_CFG, "creating load-tester control socket failed");
        }
-
        return &this->public;
 }