]> git.ipfire.org Git - pakfire.git/commitdiff
worker: Establish a control connection
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 4 Oct 2024 13:47:40 +0000 (13:47 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 4 Oct 2024 13:47:40 +0000 (13:47 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/libpakfire/worker.c

index fd9bad9cba3b3138370cce3d6421e71f1387bd37..6100056baa583bbad7dad1076561797f0cac9d99 100644 (file)
@@ -70,6 +70,13 @@ struct pakfire_worker {
 
        // HTTP Client
        struct pakfire_httpclient* client;
+
+       // Connection Timer and Holdoff Time
+       sd_event_source* connect_timer;
+       unsigned int reconnect_holdoff;
+
+       // Control Connection
+       struct pakfire_xfer* control;
 };
 
 static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) {
@@ -197,8 +204,12 @@ static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) {
 }
 
 static void pakfire_worker_free(struct pakfire_worker* worker) {
+       if (worker->control)
+               pakfire_xfer_unref(worker->control);
        if (worker->client)
                pakfire_httpclient_unref(worker->client);
+       if (worker->connect_timer)
+               sd_event_source_unref(worker->connect_timer);
        if (worker->loop)
                sd_event_unref(worker->loop);
        if (worker->daemon)
@@ -208,6 +219,102 @@ static void pakfire_worker_free(struct pakfire_worker* worker) {
        free(worker);
 }
 
+static int pakfire_worker_recv(struct pakfire_xfer* xfer,
+               const char* message, const size_t size, void* data) {
+       // XXX TODO
+       return 0;
+}
+
+static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) {
+       struct pakfire_worker* worker = data;
+       int r;
+
+       // Drop the control connection
+       if (worker->control) {
+               pakfire_xfer_unref(worker->control);
+               worker->control = NULL;
+       }
+
+       CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n",
+               worker->reconnect_holdoff / 1000000);
+
+       // Set the reconnection timer
+       r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff);
+       if (r < 0) {
+               CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Activate the timer
+       r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT);
+       if (r < 0) {
+               CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Double the holdoff time
+       worker->reconnect_holdoff *= 2;
+
+       // Cap reconnection attempts to every minute
+       if (worker->reconnect_holdoff > 60000000)
+               worker->reconnect_holdoff = 60000000;
+
+       return 0;
+}
+
+static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) {
+       struct pakfire_worker* worker = data;
+       int r;
+
+       CTX_DEBUG(worker->ctx, "Connected!\n");
+
+       // Store a reference to the control connection
+       worker->control = pakfire_xfer_ref(xfer);
+
+       return 0;
+}
+
+static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) {
+       struct pakfire_worker* worker = data;
+       struct pakfire_xfer* xfer = NULL;
+       char job_id[UUID_STR_LEN];
+       int r;
+
+       CTX_INFO(worker->ctx, "Connecting...\n");
+
+       // Format the job ID as string
+       uuid_unparse(worker->job_id, job_id);
+
+       // Create a new xfer
+       r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id);
+       if (r)
+               goto ERROR;
+
+       // Enable authentication
+       r = pakfire_xfer_auth(xfer);
+       if (r)
+               goto ERROR;
+
+       // Make this a WebSocket connection
+       r = pakfire_xfer_socket(xfer, pakfire_worker_connected,
+               pakfire_worker_recv, NULL, pakfire_worker_closed, daemon);
+       if (r)
+               goto ERROR;
+
+       // Enqueue the transfer
+       r = pakfire_httpclient_enqueue_xfer(worker->client, xfer);
+       if (r)
+               goto ERROR;
+
+       return 0;
+
+ERROR:
+       if (xfer)
+               pakfire_xfer_unref(xfer);
+
+       return r;
+}
+
 static int pakfire_worker_setup_loop(struct pakfire_worker* worker) {
        int r;
 
@@ -218,6 +325,14 @@ static int pakfire_worker_setup_loop(struct pakfire_worker* worker) {
                return r;
        }
 
+       // Setup the reconnection timer
+       r = sd_event_add_time_relative(worker->loop, &worker->connect_timer,
+               CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, worker);
+       if (r < 0) {
+               CTX_ERROR(worker->ctx, "Could not register the connection timer: %s\n", strerror(-r));
+               return r;
+       }
+
        return 0;
 }