]> git.ipfire.org Git - pakfire.git/commitdiff
jobs: Go back to the event-driven model to send logs
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 7 Feb 2025 17:56:43 +0000 (17:56 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 7 Feb 2025 18:01:20 +0000 (18:01 +0000)
When the builder is not located in the same data center it can happen
that the send buffer fills up which results in a broken TCP connection
if we continue to send any data.

We will now first of all buffer any messages that have only been
partially sent and listen for when the socket is ready to receive the
rest. If so, we will flush our buffer.

Afterwards we will call the send callback which will go and look for any
more data to send.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.c
src/pakfire/daemon.h
src/pakfire/job.c
src/pakfire/job.h
src/pakfire/xfer.c
src/pakfire/xfer.h

index 0ed933f30408d786458253bf15f7c33010768c6d..3733e2f7af56fa6ff0c0b5e6cb1f01ffb31a6898 100644 (file)
@@ -597,6 +597,66 @@ ERROR:
        return r;
 }
 
+/*
+       Called when we are ready to stream the log.
+
+       This function calls each running job which will only send one line
+       at a time to ensure that one job will not take the entire bandwidth.
+*/
+int pakfire_daemon_stream_logs(struct pakfire_daemon* self) {
+       unsigned int lines;
+       int r;
+
+       // Bail if we don't have a control connection
+       if (!self->control)
+               return 0;
+
+       // Bail if the connection isn't ready to send
+       else if (!pakfire_xfer_is_ready_to_send(self->control))
+               return 0;
+
+       do {
+               // Reset lines
+               lines = 0;
+
+               // Have every job send one line
+               for (unsigned int i = 0; i < MAX_JOBS; i++) {
+                       // Skip any empty slots
+                       if (!self->jobs[i])
+                               continue;
+
+                       // Stream logs
+                       r = pakfire_job_stream_logs(self->jobs[i]);
+                       if (r < 0) {
+                               switch (-r) {
+                                       case EAGAIN:
+                                               return 0;
+
+                                       default:
+                                               return r;
+                               }
+                       }
+
+                       // Add up lines
+                       lines += r;
+               }
+       } while (lines);
+
+       return 0;
+}
+
+static int pakfire_daemon_send(struct pakfire_xfer* xfer, void* data) {
+       struct pakfire_daemon* self = data;
+       int r;
+
+       // Stream logs
+       r = pakfire_daemon_stream_logs(self);
+       if (r < 0)
+               return r;
+
+       return 0;
+}
+
 /*
        This function is called whenever the connection to the build service could not
        be established or was interrupted. It will try to reconnect.
@@ -648,12 +708,6 @@ static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data)
                daemon->stats_timer = NULL;
        }
 
-       // Turn off log streaming for all jobs
-       for (int i = 0; i < MAX_JOBS; i++) {
-               if (daemon->jobs[i])
-                       pakfire_job_terminate_log_stream(daemon->jobs[i]);
-       }
-
        return 0;
 }
 
@@ -817,7 +871,7 @@ static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data)
 
        // Make this a WebSocket connection
        r = pakfire_xfer_socket(xfer, pakfire_daemon_connected,
-               pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon);
+               pakfire_daemon_recv, pakfire_daemon_send, pakfire_daemon_close, daemon);
        if (r)
                goto ERROR;
 
@@ -1391,8 +1445,14 @@ int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object*
        // Send the message
        r = pakfire_xfer_send_message(self->control, m, length);
        if (r < 0) {
-               ERROR(self->ctx, "Failed to send message: %s\n", strerror(-r));
-               return r;
+               switch (-r) {
+                       case EAGAIN:
+                               break;
+
+                       default:
+                               ERROR(self->ctx, "Failed to send message: %s\n", strerror(-r));
+                               return r;
+               }
        }
 
        return 0;
index a1caaad83897a799cd4e94dea9886d1a10c009a7..03ac723cfba4bbfa9eab80eb2157df064b1f03b5 100644 (file)
@@ -44,6 +44,9 @@ int pakfire_daemon_main(struct pakfire_daemon* daemon);
 
 int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job);
 
+// Stream Logs
+int pakfire_daemon_stream_logs(struct pakfire_daemon* self);
+
 // Send message
 int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* message);
 
index 404f2415955aaf0e3e353d20db4c83874fd1a473..8468c55db7e15fcdb1fa908c1f58df8906e40717 100644 (file)
@@ -67,8 +67,9 @@ struct pakfire_job {
 
        // Flags
        enum {
-               PAKFIRE_JOB_TEST   = (1 << 0),
-               PAKFIRE_JOB_CCACHE = (1 << 1),
+               PAKFIRE_JOB_TEST       = (1 << 0),
+               PAKFIRE_JOB_CCACHE     = (1 << 1),
+               PAKFIRE_JOB_STREAM_LOG = (1 << 2),
        } flags;
 
        // Package URL
@@ -111,6 +112,10 @@ struct pakfire_job {
        char** uploads;
 };
 
+static int pakfire_job_has_flag(struct pakfire_job* self, int flag) {
+       return (self->flags & flag);
+}
+
 static int pakfire_parse_job(struct pakfire_job* job, json_object* data) {
        json_object* ccache = NULL;
        json_object* o = NULL;
@@ -509,7 +514,15 @@ static int pakfire_job_exited(sd_event_source* s, const siginfo_t* si, void* dat
 }
 
 static int pakfire_job_send_log(struct pakfire_job* job, int priority, const char* line, size_t length) {
-       return pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length);
+       int r;
+
+       // Enqueue the line into the buffer
+       r = pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length);
+       if (r < 0)
+               return r;
+
+       // Ask the daemon to send it
+       return pakfire_daemon_stream_logs(job->daemon);
 }
 
 static int pakfire_job_stdout(struct pakfire_log_stream* stream,
@@ -825,61 +838,40 @@ ERROR:
        return r;
 }
 
-static int pakfire_job_stream(sd_event_source* s, void* data) {
-       struct pakfire_job* job = data;
+int pakfire_job_stream_logs(struct pakfire_job* self) {
        struct timeval timestamp = {};
        char* line = NULL;
        size_t length = 0;
        int priority;
        int r;
 
-       // Send as many log messages as possible
-       for (;;) {
-               // Try to dequeue a line from the log buffer
-               r = pakfire_log_buffer_dequeue(job->log.buffer, &timestamp, &priority, &line, &length);
-               if (r < 0) {
-                       ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
-                       return r;
-               }
-
-               // We currently have no data and want to be called again later
-               if (!line)
-                       break;
+       // Don't do this if not enabled
+       if (!pakfire_job_has_flag(self, PAKFIRE_JOB_STREAM_LOG))
+               return 0;
 
-               // If we have received a line let's send it
-               r = pakfire_job_send_log_line(job, &timestamp, priority, line, length);
-               free(line);
-               if (r < 0)
-                       return r;
+       // Try to dequeue a line from the log buffer
+       r = pakfire_log_buffer_dequeue(self->log.buffer, &timestamp, &priority, &line, &length);
+       if (r < 0) {
+               ERROR(self->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
+               return r;
        }
 
-       return 0;
-}
+       // Done if there is no data
+       if (!line)
+               return 0;
 
-int pakfire_job_launch_log_stream(struct pakfire_job* job) {
-       int r;
+       // If we have received a line let's send it
+       r = pakfire_job_send_log_line(self, &timestamp, priority, line, length);
 
-       // Make sure the previous event is actually gone
-       pakfire_job_terminate_log_stream(job);
+       // Cleanup
+       free(line);
 
-       // Create the log stream event
-       r = sd_event_add_post(job->loop, &job->log.stream, pakfire_job_stream, job);
-       if (r < 0) {
-               ERROR(job->ctx, "Could not register the log stream: %s\n", strerror(-r));
+       // Raise any errors
+       if (r < 0)
                return r;
-       }
 
-       return 0;
-}
-
-int pakfire_job_terminate_log_stream(struct pakfire_job* job) {
-       // Disable log streaming
-       if (job->log.stream) {
-               sd_event_source_unref(job->log.stream);
-               job->log.stream = NULL;
-       }
-
-       return 0;
+       // Otherwise return the number of lines sent
+       return 1;
 }
 
 int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
@@ -994,11 +986,11 @@ int pakfire_job_handle_message(struct pakfire_job* self, struct json_object* mes
 
        // Launch log stream
        if (pakfire_string_equals(command, "launch-log-stream"))
-               return pakfire_job_launch_log_stream(self);
+               self->flags |= PAKFIRE_JOB_STREAM_LOG;
 
        // Terminate log stream
        else if (pakfire_string_equals(command, "terminate-log-stream"))
-               return pakfire_job_terminate_log_stream(self);
+               self->flags &= ~PAKFIRE_JOB_STREAM_LOG;
 
        // Abort job
        if (pakfire_string_equals(command, "abort"))
index db6ce25ae855e7024ebde7ae47dfefddb6660193..6b99824afd47bdd33915bd4feef8d0f81c83794b 100644 (file)
@@ -42,8 +42,7 @@ int pakfire_job_launch(struct pakfire_job* job);
 int pakfire_job_terminate(struct pakfire_job* worker, int signal);
 
 // Log Stream
-int pakfire_job_launch_log_stream(struct pakfire_job* job);
-int pakfire_job_terminate_log_stream(struct pakfire_job* job);
+int pakfire_job_stream_logs(struct pakfire_job* self);
 
 // Message Received
 int pakfire_job_handle_message(struct pakfire_job* self, struct json_object* message);
index ec18dac0d3c206b49def0afdea3695f1bf00c78f..a4b86c421cb0f354ceff1a6d72bc83c88f43d53f 100644 (file)
@@ -120,6 +120,17 @@ struct pakfire_xfer {
                char* data;
                size_t length;
        } recv_buffer;
+
+       // WebSocket Send Buffer
+       struct pakfire_send_buffer {
+               char* data;
+               size_t length;
+       } send_buffer;
+
+       // State
+       enum pakfire_xfer_state {
+               PAKFIRE_XFER_READY_TO_SEND = (1 << 0),
+       } state;
 };
 
 static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
@@ -138,6 +149,8 @@ static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
        // Receive Buffer
        if (xfer->recv_buffer.data)
                free(xfer->recv_buffer.data);
+       if (xfer->send_buffer.data)
+               free(xfer->send_buffer.data);
 
        // Query Arguments
        if (xfer->queries)
@@ -979,12 +992,22 @@ static int pakfire_xfer_allocate(struct pakfire_xfer* xfer, size_t length) {
        return 0;
 }
 
-static int pakfire_xfer_socket_send(struct pakfire_xfer* xfer) {
+static int pakfire_xfer_socket_send(struct pakfire_xfer* self) {
        int r;
 
+       // The socket is now ready to send
+       self->state |= PAKFIRE_XFER_READY_TO_SEND;
+
+       // Finish sending any partially sent messages
+       if (self->send_buffer.data) {
+               r = pakfire_xfer_send_message(self, self->send_buffer.data, self->send_buffer.length);
+               if (r < 0)
+                       return r;
+       }
+
        // Just call the callback (if there is one)
-       if (xfer->callbacks.send) {
-               r = xfer->callbacks.send(xfer, xfer->callbacks.data);
+       if (self->callbacks.send) {
+               r = self->callbacks.send(self, self->callbacks.data);
                if (r)
                        return r;
        }
@@ -1096,10 +1119,10 @@ static pakfire_xfer_error_code_t pakfire_xfer_done_socket(
 
        // Check what callbacks we have
        if (xfer->callbacks.recv)
-               events |= EPOLLIN;
+               events |= EPOLLET|EPOLLIN;
 
        if (xfer->callbacks.send)
-               events |= EPOLLOUT;
+               events |= EPOLLET|EPOLLOUT;
 
        // Register a callback with the event loop
        r = sd_event_add_io(loop, &xfer->event, socket, events, __pakfire_xfer_socket, xfer);
@@ -1633,6 +1656,22 @@ int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback op
        return 0;
 }
 
+static int pakfire_xfer_store_message(struct pakfire_xfer* self,
+               const char* message, const size_t length) {
+       // Resize the buffer
+       self->send_buffer.data = pakfire_realloc(self->send_buffer.data, length);
+       if (!self->send_buffer.data) {
+               ERROR(self->ctx, "Failed to allocate memory: %m\n");
+               return -errno;
+       }
+
+       // Store the message
+       memcpy(self->send_buffer.data, message, length);
+       self->send_buffer.length = length;
+
+       return 0;
+}
+
 /*
        This function sends a WebSocket message
 */
@@ -1642,6 +1681,10 @@ int pakfire_xfer_send_message(struct pakfire_xfer* xfer,
        size_t offset = 0;
        int r;
 
+       // XXX We need to protect against sending a new message
+       // when there is still something in the send buffer
+       // Should we just send the previous message before we send this one?
+
        // Send the message
        while (offset < length) {
                r = curl_ws_send(xfer->handle, message + offset, length - offset,
@@ -1655,9 +1698,18 @@ int pakfire_xfer_send_message(struct pakfire_xfer* xfer,
                        case CURLE_OK:
                                return 0;
 
-                       // We could not send all data, try again
+                       // We could not send all data, store the message
+                       // and wait until the socket is ready
                        case CURLE_AGAIN:
-                               continue;
+                               // We cannot send no more
+                               xfer->state &= ~PAKFIRE_XFER_READY_TO_SEND;
+
+                               // Store the message
+                               r = pakfire_xfer_store_message(xfer, message + offset, length - offset);
+                               if (r < 0)
+                                       return r;
+
+                               return -EAGAIN;
 
                        default:
                                ERROR(xfer->ctx, "Could not send message: %s\n", curl_easy_strerror(r));
@@ -1668,6 +1720,19 @@ int pakfire_xfer_send_message(struct pakfire_xfer* xfer,
        return 0;
 }
 
+int pakfire_xfer_is_ready_to_send(struct pakfire_xfer* self) {
+       // This function is only supported for sockets
+       switch (self->direction) {
+               case PAKFIRE_XFER_SOCKET:
+                       break;
+
+               default:
+                       return -ENOTSUP;
+       }
+
+       return (self->state & PAKFIRE_XFER_READY_TO_SEND);
+}
+
 pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) {
        int r;
 
index 8207a78da98af77a271b42121970f0c646257263..9f67e9a3d774ee204e079b2b77cb93ece370226d 100644 (file)
@@ -144,5 +144,6 @@ int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback op
        pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data);
 
 int pakfire_xfer_send_message(struct pakfire_xfer* xfer, const char* message, const size_t length);
+int pakfire_xfer_is_ready_to_send(struct pakfire_xfer* self);
 
 #endif /* PAKFIRE_XFER_H */