// HTTP Client
struct pakfire_httpclient* client;
+
+ // Connection Timer and Holdoff Time
+ sd_event_source* connect_timer;
+ unsigned int reconnect_holdoff;
+
+ // Control Connection
+ struct pakfire_xfer* control;
};
static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) {
}
static void pakfire_worker_free(struct pakfire_worker* worker) {
+ if (worker->control)
+ pakfire_xfer_unref(worker->control);
if (worker->client)
pakfire_httpclient_unref(worker->client);
+ if (worker->connect_timer)
+ sd_event_source_unref(worker->connect_timer);
if (worker->loop)
sd_event_unref(worker->loop);
if (worker->daemon)
free(worker);
}
+static int pakfire_worker_recv(struct pakfire_xfer* xfer,
+ const char* message, const size_t size, void* data) {
+ // XXX TODO
+ return 0;
+}
+
+static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) {
+ struct pakfire_worker* worker = data;
+ int r;
+
+ // Drop the control connection
+ if (worker->control) {
+ pakfire_xfer_unref(worker->control);
+ worker->control = NULL;
+ }
+
+ CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n",
+ worker->reconnect_holdoff / 1000000);
+
+ // Set the reconnection timer
+ r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff);
+ if (r < 0) {
+ CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
+ return r;
+ }
+
+ // Activate the timer
+ r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT);
+ if (r < 0) {
+ CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
+ return r;
+ }
+
+ // Double the holdoff time
+ worker->reconnect_holdoff *= 2;
+
+ // Cap reconnection attempts to every minute
+ if (worker->reconnect_holdoff > 60000000)
+ worker->reconnect_holdoff = 60000000;
+
+ return 0;
+}
+
+static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) {
+ struct pakfire_worker* worker = data;
+ int r;
+
+ CTX_DEBUG(worker->ctx, "Connected!\n");
+
+ // Store a reference to the control connection
+ worker->control = pakfire_xfer_ref(xfer);
+
+ return 0;
+}
+
+static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) {
+ struct pakfire_worker* worker = data;
+ struct pakfire_xfer* xfer = NULL;
+ char job_id[UUID_STR_LEN];
+ int r;
+
+ CTX_INFO(worker->ctx, "Connecting...\n");
+
+ // Format the job ID as string
+ uuid_unparse(worker->job_id, job_id);
+
+ // Create a new xfer
+ r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id);
+ if (r)
+ goto ERROR;
+
+ // Enable authentication
+ r = pakfire_xfer_auth(xfer);
+ if (r)
+ goto ERROR;
+
+ // Make this a WebSocket connection
+ r = pakfire_xfer_socket(xfer, pakfire_worker_connected,
+ pakfire_worker_recv, NULL, pakfire_worker_closed, daemon);
+ if (r)
+ goto ERROR;
+
+ // Enqueue the transfer
+ r = pakfire_httpclient_enqueue_xfer(worker->client, xfer);
+ if (r)
+ goto ERROR;
+
+ return 0;
+
+ERROR:
+ if (xfer)
+ pakfire_xfer_unref(xfer);
+
+ return r;
+}
+
static int pakfire_worker_setup_loop(struct pakfire_worker* worker) {
int r;
return r;
}
+ // Setup the reconnection timer
+ r = sd_event_add_time_relative(worker->loop, &worker->connect_timer,
+ CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, worker);
+ if (r < 0) {
+ CTX_ERROR(worker->ctx, "Could not register the connection timer: %s\n", strerror(-r));
+ return r;
+ }
+
return 0;
}