From: Michael Tremer Date: Fri, 4 Oct 2024 13:47:40 +0000 (+0000) Subject: worker: Establish a control connection X-Git-Tag: 0.9.30~1178 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=57ff381a1330bfc9ca6cea7b81ccdf1469d761ba;p=pakfire.git worker: Establish a control connection Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/worker.c b/src/libpakfire/worker.c index fd9bad9cb..6100056ba 100644 --- a/src/libpakfire/worker.c +++ b/src/libpakfire/worker.c @@ -70,6 +70,13 @@ struct pakfire_worker { // HTTP Client struct pakfire_httpclient* client; + + // Connection Timer and Holdoff Time + sd_event_source* connect_timer; + unsigned int reconnect_holdoff; + + // Control Connection + struct pakfire_xfer* control; }; static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) { @@ -197,8 +204,12 @@ static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) { } static void pakfire_worker_free(struct pakfire_worker* worker) { + if (worker->control) + pakfire_xfer_unref(worker->control); if (worker->client) pakfire_httpclient_unref(worker->client); + if (worker->connect_timer) + sd_event_source_unref(worker->connect_timer); if (worker->loop) sd_event_unref(worker->loop); if (worker->daemon) @@ -208,6 +219,102 @@ static void pakfire_worker_free(struct pakfire_worker* worker) { free(worker); } +static int pakfire_worker_recv(struct pakfire_xfer* xfer, + const char* message, const size_t size, void* data) { + // XXX TODO + return 0; +} + +static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) { + struct pakfire_worker* worker = data; + int r; + + // Drop the control connection + if (worker->control) { + pakfire_xfer_unref(worker->control); + worker->control = NULL; + } + + CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n", + worker->reconnect_holdoff / 1000000); + + // Set the reconnection timer + r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff); + if (r < 0) { + CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); + return r; + } + + // Activate the timer + r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT); + if (r < 0) { + CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); + return r; + } + + // Double the holdoff time + worker->reconnect_holdoff *= 2; + + // Cap reconnection attempts to every minute + if (worker->reconnect_holdoff > 60000000) + worker->reconnect_holdoff = 60000000; + + return 0; +} + +static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) { + struct pakfire_worker* worker = data; + int r; + + CTX_DEBUG(worker->ctx, "Connected!\n"); + + // Store a reference to the control connection + worker->control = pakfire_xfer_ref(xfer); + + return 0; +} + +static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) { + struct pakfire_worker* worker = data; + struct pakfire_xfer* xfer = NULL; + char job_id[UUID_STR_LEN]; + int r; + + CTX_INFO(worker->ctx, "Connecting...\n"); + + // Format the job ID as string + uuid_unparse(worker->job_id, job_id); + + // Create a new xfer + r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id); + if (r) + goto ERROR; + + // Enable authentication + r = pakfire_xfer_auth(xfer); + if (r) + goto ERROR; + + // Make this a WebSocket connection + r = pakfire_xfer_socket(xfer, pakfire_worker_connected, + pakfire_worker_recv, NULL, pakfire_worker_closed, daemon); + if (r) + goto ERROR; + + // Enqueue the transfer + r = pakfire_httpclient_enqueue_xfer(worker->client, xfer); + if (r) + goto ERROR; + + return 0; + +ERROR: + if (xfer) + pakfire_xfer_unref(xfer); + + return r; +} + static int pakfire_worker_setup_loop(struct pakfire_worker* worker) { int r; @@ -218,6 +325,14 @@ static int pakfire_worker_setup_loop(struct pakfire_worker* worker) { return r; } + // Setup the reconnection timer + r = sd_event_add_time_relative(worker->loop, &worker->connect_timer, + CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, worker); + if (r < 0) { + CTX_ERROR(worker->ctx, "Could not register the connection timer: %s\n", strerror(-r)); + return r; + } + return 0; }