]> git.ipfire.org Git - pakfire.git/commitdiff
httpclient: Implement basic WebSocket communication
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 13 Aug 2024 16:54:58 +0000 (16:54 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Tue, 13 Aug 2024 16:54:58 +0000 (16:54 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/libpakfire/buildservice.c
src/libpakfire/daemon.c
src/libpakfire/httpclient.c
src/libpakfire/include/pakfire/httpclient.h
src/libpakfire/include/pakfire/xfer.h
src/libpakfire/xfer.c

index 9d449a90f0e22b75f7a847fb17f769117dd84164..fe32f2ae62d0db42ca834d887536604b23742ef7 100644 (file)
@@ -1016,9 +1016,46 @@ ERROR:
        return r;
 }
 
+static int pakfire_buildservice_daemon_recv(struct pakfire_xfer* xfer,
+               const char* message, const size_t size, void* data) {
+       printf("%.*s\n", (int)size, message);
+
+       return 0;
+}
+
 /*
        Called before the daemon's
 */
 int pakfire_buildservice_daemon(struct pakfire_buildservice* service) {
-       return 0;
+       struct pakfire_xfer* xfer = NULL;
+       char url[PATH_MAX];
+       int r;
+
+       // Compose the URL
+       r = pakfire_string_set(url, "/api/v1/builders/control");
+       if (r)
+               goto ERROR;
+
+       // Create a new xfer
+       r = pakfire_buildservice_create_xfer(&xfer, service, url);
+       if (r)
+               goto ERROR;
+
+       // Enable authentication
+       r = pakfire_xfer_auth(xfer);
+       if (r)
+               goto ERROR;
+
+       // Make this a WebSocket connection
+       r = pakfire_xfer_socket(xfer, pakfire_buildservice_daemon_recv, NULL, NULL, service);
+       if (r)
+               goto ERROR;
+
+       // Enqueue the transfer
+       r = pakfire_httpclient_enqueue_xfer(service->httpclient, xfer);
+       if (r)
+               goto ERROR;
+
+ERROR:
+       return r;
 }
index 5b3c796842f92cef67e5a6d0ef9cead8d2a9f346..0b92b4b4086cfe4cc6c29b21477e6e701693d312 100644 (file)
@@ -172,5 +172,12 @@ ERROR:
 }
 
 int pakfire_daemon_main(struct pakfire_daemon* daemon) {
+       int r;
+
+       // Connect to the build service
+       r = pakfire_buildservice_daemon(daemon->service);
+       if (r)
+               return r;
+
        return pakfire_daemon_loop(daemon);
 }
index ebf7e974bcab6be3d23d964cdb5dc47d4e25437e..30fb64d2b1a8f027a5a3c20a331d214b1a46d037 100644 (file)
@@ -81,15 +81,6 @@ static int pakfire_httpclient_check(struct pakfire_httpclient* client) {
                                // Update reference to transfer
                                curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
 
-                               // Remove the handle
-                               curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer));
-
-                               // Decrement the xfers counter
-                               client->total_xfers--;
-
-                               // Reduce the total download size
-                               client->total_downloadsize -= pakfire_xfer_get_size(xfer);
-
                                // Call the done callback
                                r = pakfire_xfer_done(xfer, msg->data.result);
                                switch (-r) {
@@ -480,6 +471,10 @@ struct pakfire_httpclient* pakfire_httpclient_unref(struct pakfire_httpclient* c
        return NULL;
 }
 
+sd_event* pakfire_httpclient_loop(struct pakfire_httpclient* client) {
+       return sd_event_ref(client->loop);
+}
+
 CURLSH* pakfire_httpclient_share(struct pakfire_httpclient* client) {
        return client->share;
 }
@@ -515,6 +510,27 @@ int pakfire_httpclient_enqueue_xfer(struct pakfire_httpclient* client,
        return 0;
 }
 
+int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* client,
+               struct pakfire_xfer* xfer) {
+       int r;
+
+       // Decrement the xfers counter
+       client->total_xfers--;
+
+       // Reduce the total download size
+       client->total_downloadsize -= pakfire_xfer_get_size(xfer);
+
+       // Remove the handle
+       r = curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer));
+       if (r) {
+               CTX_ERROR(client->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r));
+
+               return r;
+       }
+
+       return 0;
+}
+
 int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) {
        int r = 0;
 
index 4c08a6c1dc2ca3409554c5b595cd087c68fee778..304cfe41b89da9ba7cfa910b8f03c44b449768c9 100644 (file)
@@ -38,6 +38,7 @@ int pakfire_httpclient_create(struct pakfire_httpclient** downloader,
 struct pakfire_httpclient* pakfire_httpclient_ref(struct pakfire_httpclient* downloader);
 struct pakfire_httpclient* pakfire_httpclient_unref(struct pakfire_httpclient* downloader);
 
+sd_event* pakfire_httpclient_loop(struct pakfire_httpclient* client);
 CURLSH* pakfire_httpclient_share(struct pakfire_httpclient* downloader);
 
 int pakfire_httpclient_create_xfer(struct pakfire_xfer** xfer,
@@ -45,6 +46,8 @@ int pakfire_httpclient_create_xfer(struct pakfire_xfer** xfer,
 
 int pakfire_httpclient_enqueue_xfer(
        struct pakfire_httpclient* downloader, struct pakfire_xfer* xfer);
+int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* client,
+       struct pakfire_xfer* xfer);
 
 int pakfire_httpclient_run(struct pakfire_httpclient* downloader, const char* title);
 
index da32b2f65fdfffcc55cb980f8ef32293d6f6e075..7a7a1a1ccff0e04fd5da1cf1022f8f77dd9b063e 100644 (file)
@@ -127,5 +127,13 @@ pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags)
 pakfire_xfer_error_code_t pakfire_xfer_run_api_request(
        struct pakfire_xfer* xfer, struct json_object** response);
 
+// WebSocket
+typedef int (*pakfire_xfer_recv_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data);
+typedef int (*pakfire_xfer_send_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data);
+typedef int (*pakfire_xfer_close_callback)(struct pakfire_xfer* xfer, void* data);
+
+int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv,
+       pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data);
+
 #endif /* PAKFIRE_PRIVATE */
 #endif /* PAKFIRE_XFER_H */
index 40ca7dac0a3e9e253e0a136c6b2b51ca920dc103..f11991764b8b12d5862d10f29437ae11359bbad0 100644 (file)
@@ -30,6 +30,8 @@
 #include <openssl/err.h>
 #include <openssl/evp.h>
 
+#include <systemd/sd-event.h>
+
 #include <pakfire/ctx.h>
 #include <pakfire/mirrorlist.h>
 #include <pakfire/path.h>
@@ -70,6 +72,7 @@ struct pakfire_xfer {
        enum {
                PAKFIRE_XFER_DOWNLOAD = 0,
                PAKFIRE_XFER_UPLOAD   = 1,
+               PAKFIRE_XFER_SOCKET   = 2,
        } direction;
 
        // Size
@@ -101,6 +104,23 @@ struct pakfire_xfer {
 
        // Space for cURL error message
        char error[CURL_ERROR_SIZE];
+
+       // Event Loop
+       sd_event_source* event;
+
+       // Callbacks
+       struct pakfire_xfer_callbacks {
+               pakfire_xfer_recv_callback recv;
+               pakfire_xfer_send_callback send;
+               pakfire_xfer_close_callback close;
+               void* data;
+       } callbacks;
+
+       // WebSocket Receive Buffer
+       struct pakfire_xfer_buffer {
+               char* data;
+               size_t size;
+       } buffer;
 };
 
 static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
@@ -116,6 +136,10 @@ static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
        if (xfer->evp)
                EVP_MD_CTX_free(xfer->evp);
 
+       // systemd
+       if (xfer->event)
+               sd_event_source_unref(xfer->event);
+
        // cURL stuff
        if (xfer->handle)
                curl_easy_cleanup(xfer->handle);
@@ -255,8 +279,8 @@ static int pakfire_xfer_setup(struct pakfire_xfer* xfer) {
        if (r)
                goto ERROR;
 
-       // Limit protocols to HTTPS, HTTP, FTP and FILE
-       curl_easy_setopt(xfer->handle, CURLOPT_PROTOCOLS_STR, "HTTPS,HTTP,FTP,FILE");
+       // Limit protocols to HTTPS, HTTP, FTP, FILE and WebSocket over TLS
+       curl_easy_setopt(xfer->handle, CURLOPT_PROTOCOLS_STR, "HTTPS,HTTP,FTP,FILE,WSS");
 
        // Raise any HTTP errors
        r = curl_easy_setopt(xfer->handle, CURLOPT_FAILONERROR, 1L);
@@ -389,7 +413,6 @@ int pakfire_xfer_set_method(struct pakfire_xfer* xfer,
        return curl_easy_setopt(xfer->handle, CURLOPT_CUSTOMREQUEST, m);
 }
 
-
 const char* pakfire_xfer_get_title(struct pakfire_xfer* xfer) {
        char title[PATH_MAX];
        int r;
@@ -738,6 +761,143 @@ static const char* curl_http_version(long v) {
        return "unknown";
 }
 
+static int pakfire_xfer_allocate(struct pakfire_xfer* xfer, size_t size) {
+
+       // Otherwise, we resize the buffer
+       xfer->buffer.data = realloc(xfer->buffer.data, size);
+       if (!xfer->buffer.data) {
+               CTX_ERROR(xfer->ctx, "Could not allocate memory: %m\n");
+
+               return -errno;
+       }
+
+       // Update the size
+       xfer->buffer.size = size;
+
+       return 0;
+}
+
+static int pakfire_xfer_socket_send(struct pakfire_xfer* xfer) {
+       return -1; // TODO
+}
+
+static int pakfire_xfer_socket_recv(struct pakfire_xfer* xfer) {
+       const struct curl_ws_frame* meta = NULL;
+       char* buffer[4096];
+       int r;
+
+       size_t bytes_received = 0;
+
+       // Read as many bytes as possible
+       r = curl_ws_recv(xfer->handle, buffer, sizeof(buffer), &bytes_received, &meta);
+       if (r) {
+               CTX_ERROR(xfer->ctx, "Could not read from WebSocket: %s\n", curl_easy_strerror(r));
+
+               return r;
+       }
+
+       CTX_DEBUG(xfer->ctx, "Read %zu byte(s) from WebSocket\n", bytes_received);
+
+       // If we have not received anything, we will wait for being called again
+       if (!bytes_received)
+               return 0;
+
+       // Allocate some buffer space
+       r = pakfire_xfer_allocate(xfer, meta->offset + bytes_received);
+       if (r)
+               return r;
+
+       // Copy the received message
+       memcpy(xfer->buffer.data + meta->offset, buffer, bytes_received);
+
+       // Call again if this was not the entire message
+       if (meta->flags & CURLWS_CONT)
+               return pakfire_xfer_socket_recv(xfer);
+
+       CTX_DEBUG(xfer->ctx, "We have received a message of %zu byte(s)\n", xfer->buffer.size);
+
+       if (meta->flags & CURLWS_TEXT) {
+               CTX_DEBUG(xfer->ctx, "The message is a text\n");
+       } else if (meta->flags & CURLWS_BINARY) {
+               CTX_DEBUG(xfer->ctx, "The message is binary\n");
+       }
+
+       // Once we have received the entire message we call the callback
+       if (xfer->callbacks.recv) {
+               r = xfer->callbacks.recv(xfer, xfer->buffer.data, xfer->buffer.size, xfer->callbacks.data);
+               if (r)
+                       return r;
+       }
+
+       return 0;
+}
+
+static int __pakfire_xfer_socket(sd_event_source* s, int fd, uint32_t events, void* data) {
+       struct pakfire_xfer* xfer = data;
+       int r;
+
+       // Is there any data to read?
+       if (events & EPOLLIN) {
+               r = pakfire_xfer_socket_recv(xfer);
+               if (r)
+                       return r;
+       }
+
+       // Is there any data to send?
+       if (events & EPOLLOUT) {
+               r = pakfire_xfer_socket_send(xfer);
+               if (r)
+                       return r;
+       }
+
+       return 0;
+}
+
+/*
+       This is being called once a WebSocket connection has connected and is ready to
+       send or receive data.
+*/
+static pakfire_xfer_error_code_t pakfire_xfer_done_socket(struct pakfire_xfer* xfer, int code) {
+       curl_socket_t socket = -1;
+       int events = 0;
+       int r;
+
+       // Fetch the event loop
+       sd_event* loop = pakfire_httpclient_loop(xfer->client);
+
+       // Fetch the socket
+       r = curl_easy_getinfo(xfer->handle, CURLINFO_ACTIVESOCKET, &socket);
+       if (r) {
+               CTX_ERROR(xfer->ctx, "Could not fetch the socket: %s\n", curl_easy_strerror(r));
+               goto ERROR;
+       }
+
+       CTX_DEBUG(xfer->ctx, "Connection is using socket %d\n", socket);
+
+       // Check what callbacks we have
+       if (xfer->callbacks.recv)
+               events |= EPOLLIN;
+
+       if (xfer->callbacks.send)
+               events |= EPOLLOUT;
+
+       // Register a callback with the event loop
+       r = sd_event_add_io(loop, &xfer->event, socket, events, __pakfire_xfer_socket, xfer);
+       if (r < 0) {
+               CTX_ERROR(xfer->ctx, "Could not register socket %d: %s\n", socket, strerror(-r));
+
+               goto ERROR;
+       }
+
+       CTX_DEBUG(xfer->ctx, "WebSocket registered with event loop\n");
+
+ERROR:
+       if (loop)
+               sd_event_unref(loop);
+
+       return r;
+}
+
 static int pakfire_xfer_save(struct pakfire_xfer* xfer) {
        int r;
 
@@ -1019,6 +1179,22 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
                }
        }
 
+       // Handle WebSockets separately
+       switch (xfer->direction) {
+               case PAKFIRE_XFER_SOCKET:
+                       return pakfire_xfer_done_socket(xfer, code);
+
+               default:
+                       break;
+       }
+
+       // Remove the handle
+       if (xfer->client) {
+               r = pakfire_httpclient_remove_xfer(xfer->client, xfer);
+               if (r)
+                       return r;
+       }
+
        // Handle any errors
        if (code) {
                r = pakfire_xfer_fail(xfer);
@@ -1053,6 +1229,9 @@ static int pakfire_xfer_update(void* data,
                        // Update the xferred counter
                        xfer->xferred = ulnow;
                        break;
+
+               case PAKFIRE_XFER_SOCKET:
+                       break;
        }
 
        // Do nothing if no progress indicator has been set up
@@ -1160,6 +1339,15 @@ static int pakfire_xfer_prepare_url(struct pakfire_xfer* xfer) {
                goto ERROR;
        }
 
+       // Replace the schema if we are using SOCKET
+       if (xfer->direction == PAKFIRE_XFER_SOCKET) {
+               r = curl_url_set(xfer->fullurl, CURLUPART_SCHEME, "wss", 0);
+               if (r) {
+                       CTX_ERROR(xfer->ctx, "Could not change to WebSocket: %s\n", curl_url_strerror(r));
+                       goto ERROR;
+               }
+       }
+
        // Set the URL
        r = curl_easy_setopt(xfer->handle, CURLOPT_CURLU, xfer->fullurl);
        if (r) {
@@ -1203,6 +1391,16 @@ int pakfire_xfer_prepare(struct pakfire_xfer* xfer, struct pakfire_progress* pro
                        // Upload files chunked
                        xfer->headers = curl_slist_append(xfer->headers, "Transfer-Encoding: chunked");
                        break;
+
+               case PAKFIRE_XFER_SOCKET:
+                       // Ask cURL to connect and let us handle the rest
+                       r = curl_easy_setopt(xfer->handle, CURLOPT_CONNECT_ONLY, 2L);
+                       if (r) {
+                               CTX_ERROR(xfer->ctx, "Could not enable CONNECT_ONLY\n");
+                               return r;
+                       }
+
+                       break;
        }
 
        // Compose the URL
@@ -1278,6 +1476,19 @@ int pakfire_xfer_prepare(struct pakfire_xfer* xfer, struct pakfire_progress* pro
        return 0;
 }
 
+int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv,
+               pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data) {
+       xfer->direction = PAKFIRE_XFER_SOCKET;
+
+       // Store the callbacks
+       xfer->callbacks.recv  = recv;
+       xfer->callbacks.send  = send;
+       xfer->callbacks.close = close;
+       xfer->callbacks.data  = data;
+
+       return 0;
+}
+
 pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) {
        int r;