#include <openssl/err.h>
#include <openssl/evp.h>
+#include <systemd/sd-event.h>
+
#include <pakfire/ctx.h>
#include <pakfire/mirrorlist.h>
#include <pakfire/path.h>
enum {
PAKFIRE_XFER_DOWNLOAD = 0,
PAKFIRE_XFER_UPLOAD = 1,
+ PAKFIRE_XFER_SOCKET = 2,
} direction;
// Size
// Space for cURL error message
char error[CURL_ERROR_SIZE];
+
+ // Event Loop
+ sd_event_source* event;
+
+ // Callbacks
+ struct pakfire_xfer_callbacks {
+ pakfire_xfer_recv_callback recv;
+ pakfire_xfer_send_callback send;
+ pakfire_xfer_close_callback close;
+ void* data;
+ } callbacks;
+
+ // WebSocket Receive Buffer
+ struct pakfire_xfer_buffer {
+ char* data;
+ size_t size;
+ } buffer;
};
static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
if (xfer->evp)
EVP_MD_CTX_free(xfer->evp);
+ // systemd
+ if (xfer->event)
+ sd_event_source_unref(xfer->event);
+
// cURL stuff
if (xfer->handle)
curl_easy_cleanup(xfer->handle);
if (r)
goto ERROR;
- // Limit protocols to HTTPS, HTTP, FTP and FILE
- curl_easy_setopt(xfer->handle, CURLOPT_PROTOCOLS_STR, "HTTPS,HTTP,FTP,FILE");
+ // Limit protocols to HTTPS, HTTP, FTP, FILE and WebSocket over TLS
+ curl_easy_setopt(xfer->handle, CURLOPT_PROTOCOLS_STR, "HTTPS,HTTP,FTP,FILE,WSS");
// Raise any HTTP errors
r = curl_easy_setopt(xfer->handle, CURLOPT_FAILONERROR, 1L);
return curl_easy_setopt(xfer->handle, CURLOPT_CUSTOMREQUEST, m);
}
-
const char* pakfire_xfer_get_title(struct pakfire_xfer* xfer) {
char title[PATH_MAX];
int r;
return "unknown";
}
+static int pakfire_xfer_allocate(struct pakfire_xfer* xfer, size_t size) {
+
+ // Otherwise, we resize the buffer
+ xfer->buffer.data = realloc(xfer->buffer.data, size);
+ if (!xfer->buffer.data) {
+ CTX_ERROR(xfer->ctx, "Could not allocate memory: %m\n");
+
+ return -errno;
+ }
+
+ // Update the size
+ xfer->buffer.size = size;
+
+ return 0;
+}
+
+static int pakfire_xfer_socket_send(struct pakfire_xfer* xfer) {
+ return -1; // TODO
+}
+
+static int pakfire_xfer_socket_recv(struct pakfire_xfer* xfer) {
+ const struct curl_ws_frame* meta = NULL;
+ char* buffer[4096];
+ int r;
+
+ size_t bytes_received = 0;
+
+ // Read as many bytes as possible
+ r = curl_ws_recv(xfer->handle, buffer, sizeof(buffer), &bytes_received, &meta);
+ if (r) {
+ CTX_ERROR(xfer->ctx, "Could not read from WebSocket: %s\n", curl_easy_strerror(r));
+
+ return r;
+ }
+
+ CTX_DEBUG(xfer->ctx, "Read %zu byte(s) from WebSocket\n", bytes_received);
+
+ // If we have not received anything, we will wait for being called again
+ if (!bytes_received)
+ return 0;
+
+ // Allocate some buffer space
+ r = pakfire_xfer_allocate(xfer, meta->offset + bytes_received);
+ if (r)
+ return r;
+
+ // Copy the received message
+ memcpy(xfer->buffer.data + meta->offset, buffer, bytes_received);
+
+ // Call again if this was not the entire message
+ if (meta->flags & CURLWS_CONT)
+ return pakfire_xfer_socket_recv(xfer);
+
+ CTX_DEBUG(xfer->ctx, "We have received a message of %zu byte(s)\n", xfer->buffer.size);
+
+ if (meta->flags & CURLWS_TEXT) {
+ CTX_DEBUG(xfer->ctx, "The message is a text\n");
+ } else if (meta->flags & CURLWS_BINARY) {
+ CTX_DEBUG(xfer->ctx, "The message is binary\n");
+ }
+
+ // Once we have received the entire message we call the callback
+ if (xfer->callbacks.recv) {
+ r = xfer->callbacks.recv(xfer, xfer->buffer.data, xfer->buffer.size, xfer->callbacks.data);
+ if (r)
+ return r;
+ }
+
+ return 0;
+}
+
+static int __pakfire_xfer_socket(sd_event_source* s, int fd, uint32_t events, void* data) {
+ struct pakfire_xfer* xfer = data;
+ int r;
+
+ // Is there any data to read?
+ if (events & EPOLLIN) {
+ r = pakfire_xfer_socket_recv(xfer);
+ if (r)
+ return r;
+ }
+
+ // Is there any data to send?
+ if (events & EPOLLOUT) {
+ r = pakfire_xfer_socket_send(xfer);
+ if (r)
+ return r;
+ }
+
+ return 0;
+}
+
+/*
+ This is being called once a WebSocket connection has connected and is ready to
+ send or receive data.
+*/
+static pakfire_xfer_error_code_t pakfire_xfer_done_socket(struct pakfire_xfer* xfer, int code) {
+ curl_socket_t socket = -1;
+ int events = 0;
+ int r;
+
+ // Fetch the event loop
+ sd_event* loop = pakfire_httpclient_loop(xfer->client);
+
+ // Fetch the socket
+ r = curl_easy_getinfo(xfer->handle, CURLINFO_ACTIVESOCKET, &socket);
+ if (r) {
+ CTX_ERROR(xfer->ctx, "Could not fetch the socket: %s\n", curl_easy_strerror(r));
+ goto ERROR;
+ }
+
+ CTX_DEBUG(xfer->ctx, "Connection is using socket %d\n", socket);
+
+ // Check what callbacks we have
+ if (xfer->callbacks.recv)
+ events |= EPOLLIN;
+
+ if (xfer->callbacks.send)
+ events |= EPOLLOUT;
+
+ // Register a callback with the event loop
+ r = sd_event_add_io(loop, &xfer->event, socket, events, __pakfire_xfer_socket, xfer);
+ if (r < 0) {
+ CTX_ERROR(xfer->ctx, "Could not register socket %d: %s\n", socket, strerror(-r));
+
+ goto ERROR;
+ }
+
+ CTX_DEBUG(xfer->ctx, "WebSocket registered with event loop\n");
+
+ERROR:
+ if (loop)
+ sd_event_unref(loop);
+
+ return r;
+}
+
static int pakfire_xfer_save(struct pakfire_xfer* xfer) {
int r;
}
}
+ // Handle WebSockets separately
+ switch (xfer->direction) {
+ case PAKFIRE_XFER_SOCKET:
+ return pakfire_xfer_done_socket(xfer, code);
+
+ default:
+ break;
+ }
+
+ // Remove the handle
+ if (xfer->client) {
+ r = pakfire_httpclient_remove_xfer(xfer->client, xfer);
+ if (r)
+ return r;
+ }
+
// Handle any errors
if (code) {
r = pakfire_xfer_fail(xfer);
// Update the xferred counter
xfer->xferred = ulnow;
break;
+
+ case PAKFIRE_XFER_SOCKET:
+ break;
}
// Do nothing if no progress indicator has been set up
goto ERROR;
}
+ // Replace the schema if we are using SOCKET
+ if (xfer->direction == PAKFIRE_XFER_SOCKET) {
+ r = curl_url_set(xfer->fullurl, CURLUPART_SCHEME, "wss", 0);
+ if (r) {
+ CTX_ERROR(xfer->ctx, "Could not change to WebSocket: %s\n", curl_url_strerror(r));
+ goto ERROR;
+ }
+ }
+
// Set the URL
r = curl_easy_setopt(xfer->handle, CURLOPT_CURLU, xfer->fullurl);
if (r) {
// Upload files chunked
xfer->headers = curl_slist_append(xfer->headers, "Transfer-Encoding: chunked");
break;
+
+ case PAKFIRE_XFER_SOCKET:
+ // Ask cURL to connect and let us handle the rest
+ r = curl_easy_setopt(xfer->handle, CURLOPT_CONNECT_ONLY, 2L);
+ if (r) {
+ CTX_ERROR(xfer->ctx, "Could not enable CONNECT_ONLY\n");
+ return r;
+ }
+
+ break;
}
// Compose the URL
return 0;
}
+int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv,
+ pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data) {
+ xfer->direction = PAKFIRE_XFER_SOCKET;
+
+ // Store the callbacks
+ xfer->callbacks.recv = recv;
+ xfer->callbacks.send = send;
+ xfer->callbacks.close = close;
+ xfer->callbacks.data = data;
+
+ return 0;
+}
+
pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) {
int r;