From: Michael Tremer Date: Tue, 13 Aug 2024 16:54:58 +0000 (+0000) Subject: httpclient: Implement basic WebSocket communication X-Git-Tag: 0.9.30~1212 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8ddda29b865686ab9219cdb1490267d1c7ffafca;p=pakfire.git httpclient: Implement basic WebSocket communication Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/buildservice.c b/src/libpakfire/buildservice.c index 9d449a90f..fe32f2ae6 100644 --- a/src/libpakfire/buildservice.c +++ b/src/libpakfire/buildservice.c @@ -1016,9 +1016,46 @@ ERROR: return r; } +static int pakfire_buildservice_daemon_recv(struct pakfire_xfer* xfer, + const char* message, const size_t size, void* data) { + printf("%.*s\n", (int)size, message); + + return 0; +} + /* Called before the daemon's */ int pakfire_buildservice_daemon(struct pakfire_buildservice* service) { - return 0; + struct pakfire_xfer* xfer = NULL; + char url[PATH_MAX]; + int r; + + // Compose the URL + r = pakfire_string_set(url, "/api/v1/builders/control"); + if (r) + goto ERROR; + + // Create a new xfer + r = pakfire_buildservice_create_xfer(&xfer, service, url); + if (r) + goto ERROR; + + // Enable authentication + r = pakfire_xfer_auth(xfer); + if (r) + goto ERROR; + + // Make this a WebSocket connection + r = pakfire_xfer_socket(xfer, pakfire_buildservice_daemon_recv, NULL, NULL, service); + if (r) + goto ERROR; + + // Enqueue the transfer + r = pakfire_httpclient_enqueue_xfer(service->httpclient, xfer); + if (r) + goto ERROR; + +ERROR: + return r; } diff --git a/src/libpakfire/daemon.c b/src/libpakfire/daemon.c index 5b3c79684..0b92b4b40 100644 --- a/src/libpakfire/daemon.c +++ b/src/libpakfire/daemon.c @@ -172,5 +172,12 @@ ERROR: } int pakfire_daemon_main(struct pakfire_daemon* daemon) { + int r; + + // Connect to the build service + r = pakfire_buildservice_daemon(daemon->service); + if (r) + return r; + return pakfire_daemon_loop(daemon); } diff --git a/src/libpakfire/httpclient.c b/src/libpakfire/httpclient.c index ebf7e974b..30fb64d2b 100644 --- a/src/libpakfire/httpclient.c +++ b/src/libpakfire/httpclient.c @@ -81,15 +81,6 @@ static int pakfire_httpclient_check(struct pakfire_httpclient* client) { // Update reference to transfer curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer); - // Remove the handle - curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer)); - - // Decrement the xfers counter - client->total_xfers--; - - // Reduce the total download size - client->total_downloadsize -= pakfire_xfer_get_size(xfer); - // Call the done callback r = pakfire_xfer_done(xfer, msg->data.result); switch (-r) { @@ -480,6 +471,10 @@ struct pakfire_httpclient* pakfire_httpclient_unref(struct pakfire_httpclient* c return NULL; } +sd_event* pakfire_httpclient_loop(struct pakfire_httpclient* client) { + return sd_event_ref(client->loop); +} + CURLSH* pakfire_httpclient_share(struct pakfire_httpclient* client) { return client->share; } @@ -515,6 +510,27 @@ int pakfire_httpclient_enqueue_xfer(struct pakfire_httpclient* client, return 0; } +int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* client, + struct pakfire_xfer* xfer) { + int r; + + // Decrement the xfers counter + client->total_xfers--; + + // Reduce the total download size + client->total_downloadsize -= pakfire_xfer_get_size(xfer); + + // Remove the handle + r = curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer)); + if (r) { + CTX_ERROR(client->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r)); + + return r; + } + + return 0; +} + int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) { int r = 0; diff --git a/src/libpakfire/include/pakfire/httpclient.h b/src/libpakfire/include/pakfire/httpclient.h index 4c08a6c1d..304cfe41b 100644 --- a/src/libpakfire/include/pakfire/httpclient.h +++ b/src/libpakfire/include/pakfire/httpclient.h @@ -38,6 +38,7 @@ int pakfire_httpclient_create(struct pakfire_httpclient** downloader, struct pakfire_httpclient* pakfire_httpclient_ref(struct pakfire_httpclient* downloader); struct pakfire_httpclient* pakfire_httpclient_unref(struct pakfire_httpclient* downloader); +sd_event* pakfire_httpclient_loop(struct pakfire_httpclient* client); CURLSH* pakfire_httpclient_share(struct pakfire_httpclient* downloader); int pakfire_httpclient_create_xfer(struct pakfire_xfer** xfer, @@ -45,6 +46,8 @@ int pakfire_httpclient_create_xfer(struct pakfire_xfer** xfer, int pakfire_httpclient_enqueue_xfer( struct pakfire_httpclient* downloader, struct pakfire_xfer* xfer); +int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* client, + struct pakfire_xfer* xfer); int pakfire_httpclient_run(struct pakfire_httpclient* downloader, const char* title); diff --git a/src/libpakfire/include/pakfire/xfer.h b/src/libpakfire/include/pakfire/xfer.h index da32b2f65..7a7a1a1cc 100644 --- a/src/libpakfire/include/pakfire/xfer.h +++ b/src/libpakfire/include/pakfire/xfer.h @@ -127,5 +127,13 @@ pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) pakfire_xfer_error_code_t pakfire_xfer_run_api_request( struct pakfire_xfer* xfer, struct json_object** response); +// WebSocket +typedef int (*pakfire_xfer_recv_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data); +typedef int (*pakfire_xfer_send_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data); +typedef int (*pakfire_xfer_close_callback)(struct pakfire_xfer* xfer, void* data); + +int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv, + pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data); + #endif /* PAKFIRE_PRIVATE */ #endif /* PAKFIRE_XFER_H */ diff --git a/src/libpakfire/xfer.c b/src/libpakfire/xfer.c index 40ca7dac0..f11991764 100644 --- a/src/libpakfire/xfer.c +++ b/src/libpakfire/xfer.c @@ -30,6 +30,8 @@ #include #include +#include + #include #include #include @@ -70,6 +72,7 @@ struct pakfire_xfer { enum { PAKFIRE_XFER_DOWNLOAD = 0, PAKFIRE_XFER_UPLOAD = 1, + PAKFIRE_XFER_SOCKET = 2, } direction; // Size @@ -101,6 +104,23 @@ struct pakfire_xfer { // Space for cURL error message char error[CURL_ERROR_SIZE]; + + // Event Loop + sd_event_source* event; + + // Callbacks + struct pakfire_xfer_callbacks { + pakfire_xfer_recv_callback recv; + pakfire_xfer_send_callback send; + pakfire_xfer_close_callback close; + void* data; + } callbacks; + + // WebSocket Receive Buffer + struct pakfire_xfer_buffer { + char* data; + size_t size; + } buffer; }; static void pakfire_xfer_free(struct pakfire_xfer* xfer) { @@ -116,6 +136,10 @@ static void pakfire_xfer_free(struct pakfire_xfer* xfer) { if (xfer->evp) EVP_MD_CTX_free(xfer->evp); + // systemd + if (xfer->event) + sd_event_source_unref(xfer->event); + // cURL stuff if (xfer->handle) curl_easy_cleanup(xfer->handle); @@ -255,8 +279,8 @@ static int pakfire_xfer_setup(struct pakfire_xfer* xfer) { if (r) goto ERROR; - // Limit protocols to HTTPS, HTTP, FTP and FILE - curl_easy_setopt(xfer->handle, CURLOPT_PROTOCOLS_STR, "HTTPS,HTTP,FTP,FILE"); + // Limit protocols to HTTPS, HTTP, FTP, FILE and WebSocket over TLS + curl_easy_setopt(xfer->handle, CURLOPT_PROTOCOLS_STR, "HTTPS,HTTP,FTP,FILE,WSS"); // Raise any HTTP errors r = curl_easy_setopt(xfer->handle, CURLOPT_FAILONERROR, 1L); @@ -389,7 +413,6 @@ int pakfire_xfer_set_method(struct pakfire_xfer* xfer, return curl_easy_setopt(xfer->handle, CURLOPT_CUSTOMREQUEST, m); } - const char* pakfire_xfer_get_title(struct pakfire_xfer* xfer) { char title[PATH_MAX]; int r; @@ -738,6 +761,143 @@ static const char* curl_http_version(long v) { return "unknown"; } +static int pakfire_xfer_allocate(struct pakfire_xfer* xfer, size_t size) { + + // Otherwise, we resize the buffer + xfer->buffer.data = realloc(xfer->buffer.data, size); + if (!xfer->buffer.data) { + CTX_ERROR(xfer->ctx, "Could not allocate memory: %m\n"); + + return -errno; + } + + // Update the size + xfer->buffer.size = size; + + return 0; +} + +static int pakfire_xfer_socket_send(struct pakfire_xfer* xfer) { + return -1; // TODO +} + +static int pakfire_xfer_socket_recv(struct pakfire_xfer* xfer) { + const struct curl_ws_frame* meta = NULL; + char* buffer[4096]; + int r; + + size_t bytes_received = 0; + + // Read as many bytes as possible + r = curl_ws_recv(xfer->handle, buffer, sizeof(buffer), &bytes_received, &meta); + if (r) { + CTX_ERROR(xfer->ctx, "Could not read from WebSocket: %s\n", curl_easy_strerror(r)); + + return r; + } + + CTX_DEBUG(xfer->ctx, "Read %zu byte(s) from WebSocket\n", bytes_received); + + // If we have not received anything, we will wait for being called again + if (!bytes_received) + return 0; + + // Allocate some buffer space + r = pakfire_xfer_allocate(xfer, meta->offset + bytes_received); + if (r) + return r; + + // Copy the received message + memcpy(xfer->buffer.data + meta->offset, buffer, bytes_received); + + // Call again if this was not the entire message + if (meta->flags & CURLWS_CONT) + return pakfire_xfer_socket_recv(xfer); + + CTX_DEBUG(xfer->ctx, "We have received a message of %zu byte(s)\n", xfer->buffer.size); + + if (meta->flags & CURLWS_TEXT) { + CTX_DEBUG(xfer->ctx, "The message is a text\n"); + } else if (meta->flags & CURLWS_BINARY) { + CTX_DEBUG(xfer->ctx, "The message is binary\n"); + } + + // Once we have received the entire message we call the callback + if (xfer->callbacks.recv) { + r = xfer->callbacks.recv(xfer, xfer->buffer.data, xfer->buffer.size, xfer->callbacks.data); + if (r) + return r; + } + + return 0; +} + +static int __pakfire_xfer_socket(sd_event_source* s, int fd, uint32_t events, void* data) { + struct pakfire_xfer* xfer = data; + int r; + + // Is there any data to read? + if (events & EPOLLIN) { + r = pakfire_xfer_socket_recv(xfer); + if (r) + return r; + } + + // Is there any data to send? + if (events & EPOLLOUT) { + r = pakfire_xfer_socket_send(xfer); + if (r) + return r; + } + + return 0; +} + +/* + This is being called once a WebSocket connection has connected and is ready to + send or receive data. +*/ +static pakfire_xfer_error_code_t pakfire_xfer_done_socket(struct pakfire_xfer* xfer, int code) { + curl_socket_t socket = -1; + int events = 0; + int r; + + // Fetch the event loop + sd_event* loop = pakfire_httpclient_loop(xfer->client); + + // Fetch the socket + r = curl_easy_getinfo(xfer->handle, CURLINFO_ACTIVESOCKET, &socket); + if (r) { + CTX_ERROR(xfer->ctx, "Could not fetch the socket: %s\n", curl_easy_strerror(r)); + goto ERROR; + } + + CTX_DEBUG(xfer->ctx, "Connection is using socket %d\n", socket); + + // Check what callbacks we have + if (xfer->callbacks.recv) + events |= EPOLLIN; + + if (xfer->callbacks.send) + events |= EPOLLOUT; + + // Register a callback with the event loop + r = sd_event_add_io(loop, &xfer->event, socket, events, __pakfire_xfer_socket, xfer); + if (r < 0) { + CTX_ERROR(xfer->ctx, "Could not register socket %d: %s\n", socket, strerror(-r)); + + goto ERROR; + } + + CTX_DEBUG(xfer->ctx, "WebSocket registered with event loop\n"); + +ERROR: + if (loop) + sd_event_unref(loop); + + return r; +} + static int pakfire_xfer_save(struct pakfire_xfer* xfer) { int r; @@ -1019,6 +1179,22 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) } } + // Handle WebSockets separately + switch (xfer->direction) { + case PAKFIRE_XFER_SOCKET: + return pakfire_xfer_done_socket(xfer, code); + + default: + break; + } + + // Remove the handle + if (xfer->client) { + r = pakfire_httpclient_remove_xfer(xfer->client, xfer); + if (r) + return r; + } + // Handle any errors if (code) { r = pakfire_xfer_fail(xfer); @@ -1053,6 +1229,9 @@ static int pakfire_xfer_update(void* data, // Update the xferred counter xfer->xferred = ulnow; break; + + case PAKFIRE_XFER_SOCKET: + break; } // Do nothing if no progress indicator has been set up @@ -1160,6 +1339,15 @@ static int pakfire_xfer_prepare_url(struct pakfire_xfer* xfer) { goto ERROR; } + // Replace the schema if we are using SOCKET + if (xfer->direction == PAKFIRE_XFER_SOCKET) { + r = curl_url_set(xfer->fullurl, CURLUPART_SCHEME, "wss", 0); + if (r) { + CTX_ERROR(xfer->ctx, "Could not change to WebSocket: %s\n", curl_url_strerror(r)); + goto ERROR; + } + } + // Set the URL r = curl_easy_setopt(xfer->handle, CURLOPT_CURLU, xfer->fullurl); if (r) { @@ -1203,6 +1391,16 @@ int pakfire_xfer_prepare(struct pakfire_xfer* xfer, struct pakfire_progress* pro // Upload files chunked xfer->headers = curl_slist_append(xfer->headers, "Transfer-Encoding: chunked"); break; + + case PAKFIRE_XFER_SOCKET: + // Ask cURL to connect and let us handle the rest + r = curl_easy_setopt(xfer->handle, CURLOPT_CONNECT_ONLY, 2L); + if (r) { + CTX_ERROR(xfer->ctx, "Could not enable CONNECT_ONLY\n"); + return r; + } + + break; } // Compose the URL @@ -1278,6 +1476,19 @@ int pakfire_xfer_prepare(struct pakfire_xfer* xfer, struct pakfire_progress* pro return 0; } +int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv, + pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data) { + xfer->direction = PAKFIRE_XFER_SOCKET; + + // Store the callbacks + xfer->callbacks.recv = recv; + xfer->callbacks.send = send; + xfer->callbacks.close = close; + xfer->callbacks.data = data; + + return 0; +} + pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) { int r;