]> git.ipfire.org Git - thirdparty/strongswan.git/commitdiff
lookip: use stream service with async I/O dispatching
authorMartin Willi <martin@revosec.ch>
Mon, 1 Jul 2013 10:47:45 +0000 (12:47 +0200)
committerMartin Willi <martin@revosec.ch>
Wed, 17 Jul 2013 14:55:50 +0000 (16:55 +0200)
Now uses SOCK_STREAM, as SOCK_SEQPACKET is not available over TCP. To have
network transparency, the message now uses network byte order.

src/libcharon/plugins/lookip/lookip.c
src/libcharon/plugins/lookip/lookip_listener.c
src/libcharon/plugins/lookip/lookip_listener.h
src/libcharon/plugins/lookip/lookip_msg.h
src/libcharon/plugins/lookip/lookip_socket.c

index 9887a3a9206afbde3da507c9375f57919e112606..d473c7022e17eedb8e01a80d9fa34dca20d3b612 100644 (file)
 #include <unistd.h>
 #include <stddef.h>
 #include <stdio.h>
+#include <stdlib.h>
 #include <errno.h>
 #include <getopt.h>
+#include <arpa/inet.h>
 
 /**
  * Connect to the daemon, return FD
  */
 static int make_connection()
 {
-       struct sockaddr_un addr;
-       int fd;
+       union {
+               struct sockaddr_un un;
+               struct sockaddr_in in;
+               struct sockaddr sa;
+       } addr;
+       int fd, len;
 
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, LOOKIP_SOCKET);
+       if (getenv("TCP_PORT"))
+       {
+               addr.in.sin_family = AF_INET;
+               addr.in.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+               addr.in.sin_port = htons(atoi(getenv("TCP_PORT")));
+               len = sizeof(addr.in);
+       }
+       else
+       {
+               addr.un.sun_family = AF_UNIX;
+               strcpy(addr.un.sun_path, LOOKIP_SOCKET);
 
-       fd = socket(AF_UNIX, SOCK_SEQPACKET, 0);
+               len = offsetof(struct sockaddr_un, sun_path) + strlen(addr.un.sun_path);
+       }
+       fd = socket(addr.sa.sa_family, SOCK_STREAM, 0);
        if (fd < 0)
        {
                fprintf(stderr, "opening socket failed: %s\n", strerror(errno));
                return -1;
        }
-       if (connect(fd, (struct sockaddr *)&addr,
-                       offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path)) < 0)
+       if (connect(fd, &addr.sa, len) < 0)
        {
-               fprintf(stderr, "connecting to %s failed: %s\n",
-                               LOOKIP_SOCKET, strerror(errno));
+               fprintf(stderr, "connecting failed: %s\n", strerror(errno));
                close(fd);
                return -1;
        }
        return fd;
 }
 
+static int read_all(int fd, void *buf, size_t len, int flags)
+{
+       ssize_t ret, done = 0;
+
+       while (done < len)
+       {
+               ret = recv(fd, buf, len - done, flags);
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               if (ret == 0)
+               {
+                       return 0;
+               }
+               if (ret < 0)
+               {
+                       return -1;
+               }
+               done += ret;
+               buf += ret;
+       }
+       return len;
+}
+
+static int write_all(int fd, void *buf, size_t len)
+{
+       ssize_t ret, done = 0;
+
+       while (done < len)
+       {
+               ret = write(fd, buf, len - done);
+               if (ret == -1 && errno == EINTR)
+               {       /* interrupted, try again */
+                       continue;
+               }
+               if (ret < 0)
+               {
+                       return -1;
+               }
+               done += ret;
+               buf += ret;
+       }
+       return len;
+}
+
 /**
  * Send a request message
  */
 static int send_request(int fd, int type, char *vip)
 {
        lookip_request_t req = {
-               .type = type,
+               .type = htonl(type),
        };
 
        if (vip)
        {
                snprintf(req.vip, sizeof(req.vip), "%s", vip);
        }
-       if (send(fd, &req, sizeof(req), 0) != sizeof(req))
+       if (write_all(fd, &req, sizeof(req)) != sizeof(req))
        {
                fprintf(stderr, "writing to socket failed: %s\n", strerror(errno));
                return 2;
@@ -83,7 +144,7 @@ static int receive(int fd, int block, int loop)
 
        do
        {
-               res = recv(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
+               res = read_all(fd, &resp, sizeof(resp), block ? 0 : MSG_DONTWAIT);
                if (res == 0)
                {       /* closed by server */
                        return 0;
@@ -97,7 +158,7 @@ static int receive(int fd, int block, int loop)
                        fprintf(stderr, "reading from socket failed: %s\n", strerror(errno));
                        return 1;
                }
-               switch (resp.type)
+               switch (ntohl(resp.type))
                {
                        case LOOKIP_ENTRY:
                                label = "lookup:";
@@ -120,7 +181,7 @@ static int receive(int fd, int block, int loop)
                resp.id[sizeof(resp.id) - 1] = '\0';
                resp.name[sizeof(resp.name) - 1] = '\0';
 
-               snprintf(name, sizeof(name), "%s[%u]", resp.name, resp.unique_id);
+               snprintf(name, sizeof(name), "%s[%u]", resp.name, ntohl(resp.unique_id));
                printf("%-12s %16s %16s %20s %s\n",
                           label, resp.vip, resp.ip, name, resp.id);
        }
index caf336a2e3eb51572cd3105376757f9a18efc9ee..d5eab1f6c71f89efdc74cbb3e9d6d24150af35e4 100644 (file)
@@ -290,6 +290,26 @@ METHOD(lookip_listener_t, add_listener, void,
        this->lock->unlock(this->lock);
 }
 
+METHOD(lookip_listener_t, remove_listener, void,
+       private_lookip_listener_t *this, void *user)
+{
+       listener_entry_t *listener;
+       enumerator_t *enumerator;
+
+       this->lock->write_lock(this->lock);
+       enumerator = this->listeners->create_enumerator(this->listeners);
+       while (enumerator->enumerate(enumerator, &listener))
+       {
+               if (listener->user == user)
+               {
+                       this->listeners->remove_at(this->listeners, enumerator);
+                       free(listener);
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->lock->unlock(this->lock);
+}
+
 METHOD(lookip_listener_t, destroy, void,
        private_lookip_listener_t *this)
 {
@@ -315,6 +335,7 @@ lookip_listener_t *lookip_listener_create()
                        },
                        .lookup = _lookup,
                        .add_listener = _add_listener,
+                       .remove_listener = _remove_listener,
                        .destroy = _destroy,
                },
                .lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
index 56f74ed480f0301c449d8de4fe9d1936264f962f..f6612b3241695eef973852b0858f293fd4cb75b5 100644 (file)
@@ -74,6 +74,13 @@ struct lookip_listener_t {
        void (*add_listener)(lookip_listener_t *this,
                                                 lookip_callback_t cb, void *user);
 
+       /**
+        * Unregister a listener by the user data.
+        *
+        * @param user          user data, as passed during add_listener()
+        */
+       void (*remove_listener)(lookip_listener_t *this, void *user);
+
        /**
         * Destroy a lookip_listener_t.
         */
index d5789c29fad88d9f2ab795bdb4eda549ec4af4ce..28c02d0de71f009c680900a6a0469ae0b8f57f0f 100644 (file)
@@ -69,7 +69,7 @@ struct lookip_request_t {
        int type;
        /** null terminated string representation of virtual IP */
        char vip[40];
-};
+} __attribute__((packed));
 
 /**
  * Response message sent to client.
@@ -91,6 +91,6 @@ struct lookip_response_t {
        char name[40];
        /** unique connection id */
        unsigned int unique_id;
-};
+} __attribute__((packed));
 
 #endif /** LOOKIP_MSG_H_ @}*/
index b1a46f46ac9110352c08c982db80de08e48bbc96..d25573bf4984964ca87bdfb765a3c038ff832b31 100644 (file)
@@ -48,17 +48,12 @@ struct private_lookip_socket_t {
        lookip_listener_t *listener;
 
        /**
-        * lookip unix socket file descriptor
+        * stream service accepting connections
         */
-       int socket;
+       stream_service_t *service;
 
        /**
-        * List of registered listeners, as entry_t
-        */
-       linked_list_t *registered;
-
-       /**
-        * List of connected clients, as uintptr_t FD
+        * List of connected clients, as entry_t
         */
        linked_list_t *connected;
 
@@ -69,88 +64,80 @@ struct private_lookip_socket_t {
 };
 
 /**
- * Open lookip unix socket
- */
-static bool open_socket(private_lookip_socket_t *this)
-{
-       struct sockaddr_un addr;
-       mode_t old;
-
-       addr.sun_family = AF_UNIX;
-       strcpy(addr.sun_path, LOOKIP_SOCKET);
-
-       this->socket = socket(AF_UNIX, SOCK_SEQPACKET, 0);
-       if (this->socket == -1)
-       {
-               DBG1(DBG_CFG, "creating lookip socket failed");
-               return FALSE;
-       }
-       unlink(addr.sun_path);
-       old = umask(~(S_IRWXU | S_IRWXG));
-       if (bind(this->socket, (struct sockaddr*)&addr, sizeof(addr)) < 0)
-       {
-               DBG1(DBG_CFG, "binding lookip socket failed: %s", strerror(errno));
-               close(this->socket);
-               return FALSE;
-       }
-       umask(old);
-       if (chown(addr.sun_path, lib->caps->get_uid(lib->caps),
-                         lib->caps->get_gid(lib->caps)) != 0)
-       {
-               DBG1(DBG_CFG, "changing lookip socket permissions failed: %s",
-                        strerror(errno));
-       }
-       if (listen(this->socket, 10) < 0)
-       {
-               DBG1(DBG_CFG, "listening on lookip socket failed: %s", strerror(errno));
-               close(this->socket);
-               unlink(addr.sun_path);
-               return FALSE;
-       }
-       return TRUE;
-}
-
-/**
- * Listener callback entry
+ * List entry for a connected stream
  */
 typedef struct {
-       /* FD to write to */
-       int fd;
-       /* message type to send */
-       int type;
-       /* back pointer to socket, only for subscriptions */
+       /* stream to write to */
+       stream_t *stream;
+       /* registered for up events? */
+       bool up;
+       /* registered for down events? */
+       bool down;
+       /** backref to this for unregistration */
        private_lookip_socket_t *this;
 } entry_t;
 
 /**
- * Destroy entry
+ * Clean up a connection entry
  */
-static void entry_destroy(entry_t *this)
+static void entry_destroy(entry_t *entry)
 {
-       close(this->fd);
-       free(this);
+       entry->stream->destroy(entry->stream);
+       free(entry);
+}
+
+/**
+ * Disconnect a stream, remove connection entry
+ */
+static void disconnect(private_lookip_socket_t *this, stream_t *stream)
+{
+       enumerator_t *enumerator;
+       entry_t *entry;
+
+       this->mutex->lock(this->mutex);
+       enumerator = this->connected->create_enumerator(this->connected);
+       while (enumerator->enumerate(enumerator, &entry))
+       {
+               if (entry->stream == stream)
+               {
+                       this->connected->remove_at(this->connected, enumerator);
+                       if (entry->up || entry->down)
+                       {
+                               this->listener->remove_listener(this->listener, entry);
+                       }
+                       entry_destroy(entry);
+                       break;
+               }
+       }
+       enumerator->destroy(enumerator);
+       this->mutex->unlock(this->mutex);
 }
 
 /**
- * Callback function for listener
+ * Callback function for listener up/down events
  */
-static bool listener_cb(entry_t *entry, bool up, host_t *vip,
-                                               host_t *other, identification_t *id,
-                                               char *name, u_int unique_id)
+static bool event_cb(entry_t *entry, bool up, host_t *vip, host_t *other,
+                                        identification_t *id, char *name, u_int unique_id)
 {
        lookip_response_t resp = {
-               .type = entry->type,
-               .unique_id = unique_id,
+               .unique_id = htonl(unique_id),
        };
 
-       /* filter events */
-       if (up && entry->type == LOOKIP_NOTIFY_DOWN)
+       if (up)
        {
-               return TRUE;
+               if (!entry->up)
+               {
+                       return TRUE;
+               }
+               resp.type = htonl(LOOKIP_NOTIFY_UP);
        }
-       if (!up && entry->type == LOOKIP_NOTIFY_UP)
+       else
        {
-               return TRUE;
+               if (!entry->down)
+               {
+                       return TRUE;
+               }
+               resp.type = htonl(LOOKIP_NOTIFY_DOWN);
        }
 
        snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
@@ -158,37 +145,66 @@ static bool listener_cb(entry_t *entry, bool up, host_t *vip,
        snprintf(resp.id, sizeof(resp.id), "%Y", id);
        snprintf(resp.name, sizeof(resp.name), "%s", name);
 
-       switch (send(entry->fd, &resp, sizeof(resp), 0))
+       if (entry->stream->write_all(entry->stream, &resp, sizeof(resp)))
        {
-               case sizeof(resp):
-                       return TRUE;
-               case 0:
+               return TRUE;
+       }
+       switch (errno)
+       {
+               case ECONNRESET:
+               case EPIPE:
                        /* client disconnected, adios */
                        break;
                default:
-                       DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+                       DBG1(DBG_CFG, "sending lookip event failed: %s", strerror(errno));
                        break;
        }
-       if (entry->this)
-       {       /* unregister listener */
-               entry->this->mutex->lock(entry->this->mutex);
-               entry->this->registered->remove(entry->this->registered, entry, NULL);
-               entry->this->mutex->unlock(entry->this->mutex);
+       /* don't unregister, as we return FALSE */
+       entry->up = entry->down = FALSE;
+       disconnect(entry->this, entry->stream);
+       return FALSE;
+}
+
+/**
+ * Callback function for queries
+ */
+static bool query_cb(stream_t *stream, bool up, host_t *vip, host_t *other,
+                                        identification_t *id, char *name, u_int unique_id)
+{
+       lookip_response_t resp = {
+               .type = htonl(LOOKIP_ENTRY),
+               .unique_id = htonl(unique_id),
+       };
 
-               entry_destroy(entry);
+       snprintf(resp.vip, sizeof(resp.vip), "%H", vip);
+       snprintf(resp.ip, sizeof(resp.ip), "%H", other);
+       snprintf(resp.id, sizeof(resp.id), "%Y", id);
+       snprintf(resp.name, sizeof(resp.name), "%s", name);
+
+       if (stream->write_all(stream, &resp, sizeof(resp)))
+       {
+               return TRUE;
+       }
+       switch (errno)
+       {
+               case ECONNRESET:
+               case EPIPE:
+                       /* client disconnected, adios */
+                       break;
+               default:
+                       DBG1(DBG_CFG, "sending lookip response failed: %s", strerror(errno));
+                       break;
        }
        return FALSE;
 }
 
 /**
- * Perform a entry lookup
+ * Perform a lookup
  */
-static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
+static void query(private_lookip_socket_t *this, stream_t *stream,
+                                 lookip_request_t *req)
 {
-       entry_t entry = {
-               .fd = fd,
-               .type = LOOKIP_ENTRY,
-       };
+
        host_t *vip = NULL;
        int matches = 0;
 
@@ -199,17 +215,17 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
                if (vip)
                {
                        matches = this->listener->lookup(this->listener, vip,
-                                                                                        (void*)listener_cb, &entry);
+                                                                                        (void*)query_cb, stream);
                        vip->destroy(vip);
                }
                if (matches == 0)
                {
                        lookip_response_t resp = {
-                               .type = LOOKIP_NOT_FOUND,
+                               .type = htonl(LOOKIP_NOT_FOUND),
                        };
 
                        snprintf(resp.vip, sizeof(resp.vip), "%s", req->vip);
-                       if (send(fd, &resp, sizeof(resp), 0) < 0)
+                       if (!stream->write_all(stream, &resp, sizeof(resp)))
                        {
                                DBG1(DBG_CFG, "sending lookip not-found failed: %s",
                                         strerror(errno));
@@ -219,214 +235,143 @@ static void query(private_lookip_socket_t *this, int fd, lookip_request_t *req)
        else
        {       /* dump */
                this->listener->lookup(this->listener, NULL,
-                                                          (void*)listener_cb, &entry);
+                                                          (void*)query_cb, stream);
        }
 }
 
 /**
  * Subscribe to virtual IP events
  */
-static void subscribe(private_lookip_socket_t *this, int fd, int type)
-{
-       entry_t *entry;
-
-       INIT(entry,
-               .fd = fd,
-               .type = type,
-               .this = this,
-       );
-
-       this->mutex->lock(this->mutex);
-       this->registered->insert_last(this->registered, entry);
-       this->mutex->unlock(this->mutex);
-
-       this->listener->add_listener(this->listener, (void*)listener_cb, entry);
-}
-
-/**
- * Check if a client is subscribed for notifications
- */
-static bool subscribed(private_lookip_socket_t *this, int fd)
+static void subscribe(private_lookip_socket_t *this, stream_t *stream, bool up)
 {
        enumerator_t *enumerator;
-       bool subscribed = FALSE;
        entry_t *entry;
 
        this->mutex->lock(this->mutex);
-       enumerator = this->registered->create_enumerator(this->registered);
+       enumerator = this->connected->create_enumerator(this->connected);
        while (enumerator->enumerate(enumerator, &entry))
        {
-               if (entry->fd == fd)
+               if (entry->stream == stream)
                {
-                       subscribed = TRUE;
-                       break;
+                       if (!entry->up && !entry->down)
+                       {       /* newly registered */
+                               this->listener->add_listener(this->listener,
+                                                                                        (void*)event_cb, entry);
+                       }
+                       if (up)
+                       {
+                               entry->up = TRUE;
+                       }
+                       else
+                       {
+                               entry->down = TRUE;
+                       }
                }
        }
        enumerator->destroy(enumerator);
        this->mutex->unlock(this->mutex);
-
-       return subscribed;
 }
 
 /**
- * Create a fd_set from all bound sockets
- */
-static int build_fds(private_lookip_socket_t *this, fd_set *fds)
-{
-       enumerator_t *enumerator;
-       uintptr_t fd;
-       int maxfd;
-
-       FD_ZERO(fds);
-       FD_SET(this->socket, fds);
-       maxfd = this->socket;
-
-       this->mutex->lock(this->mutex);
-       enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, &fd))
-       {
-               FD_SET(fd, fds);
-               maxfd = max(maxfd, fd);
-       }
-       enumerator->destroy(enumerator);
-       this->mutex->unlock(this->mutex);
-
-       return maxfd + 1;
-}
-
-/**
- * Find the socket select()ed
+ * Check if a client is subscribed for notifications
  */
-static int scan_fds(private_lookip_socket_t *this, fd_set *fds)
+static bool subscribed(private_lookip_socket_t *this, stream_t *stream)
 {
        enumerator_t *enumerator;
-       uintptr_t fd;
-       int selected = -1;
+       bool subscribed = FALSE;
+       entry_t *entry;
 
        this->mutex->lock(this->mutex);
        enumerator = this->connected->create_enumerator(this->connected);
-       while (enumerator->enumerate(enumerator, &fd))
+       while (enumerator->enumerate(enumerator, &entry))
        {
-               if (FD_ISSET(fd, fds))
+               if (entry->stream == stream)
                {
-                       selected = fd;
+                       subscribed = entry->up || entry->down;
                        break;
                }
        }
        enumerator->destroy(enumerator);
        this->mutex->unlock(this->mutex);
 
-       return selected;
+       return subscribed;
 }
 
 /**
- * Dispatch from a socket, return TRUE to end communication
+ * Dispatch from a socket, on-read callback
  */
-static bool dispatch(private_lookip_socket_t *this, int fd)
+static bool on_read(private_lookip_socket_t *this, stream_t *stream)
 {
        lookip_request_t req;
-       int len;
 
-       len = recv(fd, &req, sizeof(req), 0);
-       if (len != sizeof(req))
+       if (stream->read_all(stream, &req, sizeof(req)))
        {
-               if (len != 0)
+               switch (ntohl(req.type))
+               {
+                       case LOOKIP_LOOKUP:
+                               query(this, stream, &req);
+                               return TRUE;
+                       case LOOKIP_DUMP:
+                               query(this, stream, NULL);
+                               return TRUE;
+                       case LOOKIP_REGISTER_UP:
+                               subscribe(this, stream, TRUE);
+                               return TRUE;
+                       case LOOKIP_REGISTER_DOWN:
+                               subscribe(this, stream, FALSE);
+                               return TRUE;
+                       case LOOKIP_END:
+                               break;
+                       default:
+                               DBG1(DBG_CFG, "received unknown lookip command");
+                               break;
+               }
+       }
+       else
+       {
+               if (errno != ECONNRESET)
                {
                        DBG1(DBG_CFG, "receiving lookip request failed: %s",
                                 strerror(errno));
                }
-               return TRUE;
+               disconnect(this, stream);
+               return FALSE;
        }
-       switch (req.type)
+       if (subscribed(this, stream))
        {
-               case LOOKIP_LOOKUP:
-                       query(this, fd, &req);
-                       return FALSE;
-               case LOOKIP_DUMP:
-                       query(this, fd, NULL);
-                       return FALSE;
-               case LOOKIP_REGISTER_UP:
-                       subscribe(this, fd, LOOKIP_NOTIFY_UP);
-                       return FALSE;
-               case LOOKIP_REGISTER_DOWN:
-                       subscribe(this, fd, LOOKIP_NOTIFY_DOWN);
-                       return FALSE;
-               case LOOKIP_END:
-                       return TRUE;
-               default:
-                       DBG1(DBG_CFG, "received unknown lookip command");
-                       return TRUE;
+               return TRUE;
        }
+       disconnect(this, stream);
+       return FALSE;
 }
 
 /**
  * Accept client connections, dispatch
  */
-static job_requeue_t receive(private_lookip_socket_t *this)
+static bool on_accept(private_lookip_socket_t *this, stream_t *stream)
 {
-       struct sockaddr_un addr;
-       int fd, maxfd, len;
-       bool oldstate;
-       fd_set fds;
+       entry_t *entry;
 
-       while (TRUE)
-       {
-               maxfd = build_fds(this, &fds);
-               oldstate = thread_cancelability(TRUE);
-               if (select(maxfd, &fds, NULL, NULL, NULL) <= 0)
-               {
-                       thread_cancelability(oldstate);
-                       DBG1(DBG_CFG, "selecting lookip sockets failed: %s",
-                                strerror(errno));
-                       break;
-               }
-               thread_cancelability(oldstate);
+       INIT(entry,
+               .stream = stream,
+               .this = this,
+       );
 
-               if (FD_ISSET(this->socket, &fds))
-               {       /* new connection, accept() */
-                       len = sizeof(addr);
-                       fd = accept(this->socket, (struct sockaddr*)&addr, &len);
-                       if (fd != -1)
-                       {
-                               this->mutex->lock(this->mutex);
-                               this->connected->insert_last(this->connected,
-                                                                                        (void*)(uintptr_t)fd);
-                               this->mutex->unlock(this->mutex);
-                       }
-                       else
-                       {
-                               DBG1(DBG_CFG, "accepting lookip connection failed: %s",
-                                        strerror(errno));
-                       }
-                       continue;
-               }
+       this->mutex->lock(this->mutex);
+       this->connected->insert_last(this->connected, entry);
+       this->mutex->unlock(this->mutex);
 
-               fd = scan_fds(this, &fds);
-               if (fd == -1)
-               {
-                       continue;
-               }
-               if (dispatch(this, fd))
-               {
-                       this->mutex->lock(this->mutex);
-                       this->connected->remove(this->connected, (void*)(uintptr_t)fd, NULL);
-                       this->mutex->unlock(this->mutex);
-                       if (!subscribed(this, fd))
-                       {
-                               close(fd);
-                       }
-               }
-       }
-       return JOB_REQUEUE_FAIR;
+       stream->on_read(stream, (void*)on_read, this);
+
+       return TRUE;
 }
 
 METHOD(lookip_socket_t, destroy, void,
        private_lookip_socket_t *this)
 {
-       this->registered->destroy_function(this->registered, (void*)entry_destroy);
-       this->connected->destroy(this->connected);
+       DESTROY_IF(this->service);
+       this->connected->destroy_function(this->connected, (void*)entry_destroy);
        this->mutex->destroy(this->mutex);
-       close(this->socket);
        free(this);
 }
 
@@ -436,26 +381,30 @@ METHOD(lookip_socket_t, destroy, void,
 lookip_socket_t *lookip_socket_create(lookip_listener_t *listener)
 {
        private_lookip_socket_t *this;
+       char *uri;
 
        INIT(this,
                .public = {
                        .destroy = _destroy,
                },
                .listener = listener,
-               .registered = linked_list_create(),
                .connected = linked_list_create(),
                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
        );
 
-       if (!open_socket(this))
+       uri = lib->settings->get_str(lib->settings,
+                               "%s.plugins.lookip.socket", "unix://" LOOKIP_SOCKET,
+                               charon->name);
+       this->service = lib->streams->create_service(lib->streams, uri, 10);
+       if (!this->service)
        {
-               free(this);
+               DBG1(DBG_CFG, "creating lookip socket failed");
+               destroy(this);
                return NULL;
        }
 
-       lib->processor->queue_job(lib->processor,
-               (job_t*)callback_job_create_with_prio((callback_job_cb_t)receive, this,
-                               NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
+       this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
+                                                        this, JOB_PRIO_CRITICAL, 1);
 
        return &this->public;
 }