]> git.ipfire.org Git - pakfire.git/commitdiff
job: Properly implement the log stream now
authorMichael Tremer <michael.tremer@ipfire.org>
Wed, 29 Jan 2025 15:02:14 +0000 (15:02 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Wed, 29 Jan 2025 15:02:14 +0000 (15:02 +0000)
This is now a post event which will always be called when there have
been any other events processed. We will therefore try to send as many
log lines as possible before we terminate.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/job.c

index 70f41f643c65b965f738630b0fd3928b701961b8..07fad1e9d41c25c5fa0517e1f161d4e8176ec5ee 100644 (file)
@@ -89,6 +89,9 @@ struct pakfire_job {
                // Streams
                struct pakfire_log_stream* stdout;
                struct pakfire_log_stream* stderr;
+
+               // Stream Event
+               sd_event_source* stream;
        } log;
 
        // Connection Timer and Holdoff Time
@@ -242,6 +245,8 @@ static void pakfire_job_free(struct pakfire_job* job) {
                pakfire_log_stream_unref(job->log.stdout);
        if (job->log.stderr)
                pakfire_log_stream_unref(job->log.stderr);
+       if (job->log.stream)
+               sd_event_source_unref(job->log.stream);
 
        if (job->service)
                pakfire_buildservice_unref(job->service);
@@ -785,34 +790,64 @@ ERROR:
        return r;
 }
 
-static int pakfire_job_send(struct pakfire_xfer* xfer, void* data) {
+static int pakfire_job_stream(sd_event_source* s, void* data) {
        struct pakfire_job* job = data;
        char* line = NULL;
        size_t length = 0;
        int priority;
        int r;
 
-       // Try to dequeue a line from the log buffer
-       r = pakfire_log_buffer_dequeue(job->log.buffer, &priority, &line, &length);
-       if (r < 0) {
-               ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
-               goto ERROR;
+       // Do nothing if we are not connected
+       if (!job->control)
+               return 0;
+
+       // Send as many log messages as possible
+       for (;;) {
+               // Try to dequeue a line from the log buffer
+               r = pakfire_log_buffer_dequeue(job->log.buffer, &priority, &line, &length);
+               if (r < 0) {
+                       ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
+                       return r;
+               }
+
+               // We currently have no data and want to be called again later
+               if (!line)
+                       break;
+
+               // If we have received a line let's send it
+               r = pakfire_job_send_log_line(job, priority, line, length);
+               free(line);
+               if (r < 0)
+                       return r;
        }
 
-       // We currently have no data and want to be called again later
-       if (!line)
-               return -EAGAIN;
+       return 0;
+}
 
-       // If we have received a line let's send it
-       r = pakfire_job_send_log_line(job, priority, line, length);
-       if (r < 0)
-               goto ERROR;
+static int pakfire_job_terminate_log_stream(struct pakfire_job* job) {
+       // Disable log streaming
+       if (job->log.stream) {
+               sd_event_source_unref(job->log.stream);
+               job->log.stream = NULL;
+       }
 
-ERROR:
-       if (line)
-               free(line);
+       return 0;
+}
 
-       return r;
+static int pakfire_job_launch_log_stream(struct pakfire_job* job) {
+       int r;
+
+       // Make sure the previous event is actually gone
+       pakfire_job_terminate_log_stream(job);
+
+       // Create the log stream event
+       r = sd_event_add_post(job->loop, &job->log.stream, pakfire_job_stream, job);
+       if (r < 0) {
+               ERROR(job->ctx, "Could not register the log stream: %s\n", strerror(-r));
+               return r;
+       }
+
+       return 0;
 }
 
 static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) {
@@ -825,6 +860,9 @@ static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) {
                job->control = NULL;
        }
 
+       // Turn off the log stream
+       pakfire_job_terminate_log_stream(job);
+
        INFO(job->ctx, "Will attempt to reconnect in %u second(s)\n",
                job->reconnect_holdoff / 1000000);
 
@@ -854,12 +892,18 @@ static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) {
 
 static int pakfire_job_connected(struct pakfire_xfer* xfer, void* data) {
        struct pakfire_job* job = data;
+       int r;
 
        DEBUG(job->ctx, "Connected!\n");
 
        // Store a reference to the control connection
        job->control = pakfire_xfer_ref(xfer);
 
+       // Launch the log stream
+       r = pakfire_job_launch_log_stream(job);
+       if (r < 0)
+               return r;
+
        switch (job->state) {
                // If the job has connected for the first time, we launch it
                case PAKFIRE_JOB_STATE_INIT:
@@ -904,7 +948,7 @@ static int pakfire_job_connect(sd_event_source* s, uint64_t usec, void* data) {
 
        // Make this a WebSocket connection
        r = pakfire_xfer_socket(xfer, pakfire_job_connected,
-               pakfire_job_recv, pakfire_job_send, pakfire_job_closed, job);
+               pakfire_job_recv, NULL, pakfire_job_closed, job);
        if (r)
                goto ERROR;