From: Michael Tremer Date: Fri, 7 Feb 2025 17:56:43 +0000 (+0000) Subject: jobs: Go back to the event-driven model to send logs X-Git-Tag: 0.9.30~92 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=f6ae13633e6e3dac7f9a9c1df6e0a40d76814f31;p=pakfire.git jobs: Go back to the event-driven model to send logs When the builder is not located in the same data center it can happen that the send buffer fills up which results in a broken TCP connection if we continue to send any data. We will now first of all buffer any messages that have only been partially sent and listen for when the socket is ready to receive the rest. If so, we will flush our buffer. Afterwards we will call the send callback which will go and look for any more data to send. Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/daemon.c b/src/pakfire/daemon.c index 0ed933f3..3733e2f7 100644 --- a/src/pakfire/daemon.c +++ b/src/pakfire/daemon.c @@ -597,6 +597,66 @@ ERROR: return r; } +/* + Called when we are ready to stream the log. + + This function calls each running job which will only send one line + at a time to ensure that one job will not take the entire bandwidth. +*/ +int pakfire_daemon_stream_logs(struct pakfire_daemon* self) { + unsigned int lines; + int r; + + // Bail if we don't have a control connection + if (!self->control) + return 0; + + // Bail if the connection isn't ready to send + else if (!pakfire_xfer_is_ready_to_send(self->control)) + return 0; + + do { + // Reset lines + lines = 0; + + // Have every job send one line + for (unsigned int i = 0; i < MAX_JOBS; i++) { + // Skip any empty slots + if (!self->jobs[i]) + continue; + + // Stream logs + r = pakfire_job_stream_logs(self->jobs[i]); + if (r < 0) { + switch (-r) { + case EAGAIN: + return 0; + + default: + return r; + } + } + + // Add up lines + lines += r; + } + } while (lines); + + return 0; +} + +static int pakfire_daemon_send(struct pakfire_xfer* xfer, void* data) { + struct pakfire_daemon* self = data; + int r; + + // Stream logs + r = pakfire_daemon_stream_logs(self); + if (r < 0) + return r; + + return 0; +} + /* This function is called whenever the connection to the build service could not be established or was interrupted. It will try to reconnect. @@ -648,12 +708,6 @@ static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) daemon->stats_timer = NULL; } - // Turn off log streaming for all jobs - for (int i = 0; i < MAX_JOBS; i++) { - if (daemon->jobs[i]) - pakfire_job_terminate_log_stream(daemon->jobs[i]); - } - return 0; } @@ -817,7 +871,7 @@ static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) // Make this a WebSocket connection r = pakfire_xfer_socket(xfer, pakfire_daemon_connected, - pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon); + pakfire_daemon_recv, pakfire_daemon_send, pakfire_daemon_close, daemon); if (r) goto ERROR; @@ -1391,8 +1445,14 @@ int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* // Send the message r = pakfire_xfer_send_message(self->control, m, length); if (r < 0) { - ERROR(self->ctx, "Failed to send message: %s\n", strerror(-r)); - return r; + switch (-r) { + case EAGAIN: + break; + + default: + ERROR(self->ctx, "Failed to send message: %s\n", strerror(-r)); + return r; + } } return 0; diff --git a/src/pakfire/daemon.h b/src/pakfire/daemon.h index a1caaad8..03ac723c 100644 --- a/src/pakfire/daemon.h +++ b/src/pakfire/daemon.h @@ -44,6 +44,9 @@ int pakfire_daemon_main(struct pakfire_daemon* daemon); int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job); +// Stream Logs +int pakfire_daemon_stream_logs(struct pakfire_daemon* self); + // Send message int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* message); diff --git a/src/pakfire/job.c b/src/pakfire/job.c index 404f2415..8468c55d 100644 --- a/src/pakfire/job.c +++ b/src/pakfire/job.c @@ -67,8 +67,9 @@ struct pakfire_job { // Flags enum { - PAKFIRE_JOB_TEST = (1 << 0), - PAKFIRE_JOB_CCACHE = (1 << 1), + PAKFIRE_JOB_TEST = (1 << 0), + PAKFIRE_JOB_CCACHE = (1 << 1), + PAKFIRE_JOB_STREAM_LOG = (1 << 2), } flags; // Package URL @@ -111,6 +112,10 @@ struct pakfire_job { char** uploads; }; +static int pakfire_job_has_flag(struct pakfire_job* self, int flag) { + return (self->flags & flag); +} + static int pakfire_parse_job(struct pakfire_job* job, json_object* data) { json_object* ccache = NULL; json_object* o = NULL; @@ -509,7 +514,15 @@ static int pakfire_job_exited(sd_event_source* s, const siginfo_t* si, void* dat } static int pakfire_job_send_log(struct pakfire_job* job, int priority, const char* line, size_t length) { - return pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length); + int r; + + // Enqueue the line into the buffer + r = pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length); + if (r < 0) + return r; + + // Ask the daemon to send it + return pakfire_daemon_stream_logs(job->daemon); } static int pakfire_job_stdout(struct pakfire_log_stream* stream, @@ -825,61 +838,40 @@ ERROR: return r; } -static int pakfire_job_stream(sd_event_source* s, void* data) { - struct pakfire_job* job = data; +int pakfire_job_stream_logs(struct pakfire_job* self) { struct timeval timestamp = {}; char* line = NULL; size_t length = 0; int priority; int r; - // Send as many log messages as possible - for (;;) { - // Try to dequeue a line from the log buffer - r = pakfire_log_buffer_dequeue(job->log.buffer, ×tamp, &priority, &line, &length); - if (r < 0) { - ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r)); - return r; - } - - // We currently have no data and want to be called again later - if (!line) - break; + // Don't do this if not enabled + if (!pakfire_job_has_flag(self, PAKFIRE_JOB_STREAM_LOG)) + return 0; - // If we have received a line let's send it - r = pakfire_job_send_log_line(job, ×tamp, priority, line, length); - free(line); - if (r < 0) - return r; + // Try to dequeue a line from the log buffer + r = pakfire_log_buffer_dequeue(self->log.buffer, ×tamp, &priority, &line, &length); + if (r < 0) { + ERROR(self->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r)); + return r; } - return 0; -} + // Done if there is no data + if (!line) + return 0; -int pakfire_job_launch_log_stream(struct pakfire_job* job) { - int r; + // If we have received a line let's send it + r = pakfire_job_send_log_line(self, ×tamp, priority, line, length); - // Make sure the previous event is actually gone - pakfire_job_terminate_log_stream(job); + // Cleanup + free(line); - // Create the log stream event - r = sd_event_add_post(job->loop, &job->log.stream, pakfire_job_stream, job); - if (r < 0) { - ERROR(job->ctx, "Could not register the log stream: %s\n", strerror(-r)); + // Raise any errors + if (r < 0) return r; - } - return 0; -} - -int pakfire_job_terminate_log_stream(struct pakfire_job* job) { - // Disable log streaming - if (job->log.stream) { - sd_event_source_unref(job->log.stream); - job->log.stream = NULL; - } - - return 0; + // Otherwise return the number of lines sent + return 1; } int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, @@ -994,11 +986,11 @@ int pakfire_job_handle_message(struct pakfire_job* self, struct json_object* mes // Launch log stream if (pakfire_string_equals(command, "launch-log-stream")) - return pakfire_job_launch_log_stream(self); + self->flags |= PAKFIRE_JOB_STREAM_LOG; // Terminate log stream else if (pakfire_string_equals(command, "terminate-log-stream")) - return pakfire_job_terminate_log_stream(self); + self->flags &= ~PAKFIRE_JOB_STREAM_LOG; // Abort job if (pakfire_string_equals(command, "abort")) diff --git a/src/pakfire/job.h b/src/pakfire/job.h index db6ce25a..6b99824a 100644 --- a/src/pakfire/job.h +++ b/src/pakfire/job.h @@ -42,8 +42,7 @@ int pakfire_job_launch(struct pakfire_job* job); int pakfire_job_terminate(struct pakfire_job* worker, int signal); // Log Stream -int pakfire_job_launch_log_stream(struct pakfire_job* job); -int pakfire_job_terminate_log_stream(struct pakfire_job* job); +int pakfire_job_stream_logs(struct pakfire_job* self); // Message Received int pakfire_job_handle_message(struct pakfire_job* self, struct json_object* message); diff --git a/src/pakfire/xfer.c b/src/pakfire/xfer.c index ec18dac0..a4b86c42 100644 --- a/src/pakfire/xfer.c +++ b/src/pakfire/xfer.c @@ -120,6 +120,17 @@ struct pakfire_xfer { char* data; size_t length; } recv_buffer; + + // WebSocket Send Buffer + struct pakfire_send_buffer { + char* data; + size_t length; + } send_buffer; + + // State + enum pakfire_xfer_state { + PAKFIRE_XFER_READY_TO_SEND = (1 << 0), + } state; }; static void pakfire_xfer_free(struct pakfire_xfer* xfer) { @@ -138,6 +149,8 @@ static void pakfire_xfer_free(struct pakfire_xfer* xfer) { // Receive Buffer if (xfer->recv_buffer.data) free(xfer->recv_buffer.data); + if (xfer->send_buffer.data) + free(xfer->send_buffer.data); // Query Arguments if (xfer->queries) @@ -979,12 +992,22 @@ static int pakfire_xfer_allocate(struct pakfire_xfer* xfer, size_t length) { return 0; } -static int pakfire_xfer_socket_send(struct pakfire_xfer* xfer) { +static int pakfire_xfer_socket_send(struct pakfire_xfer* self) { int r; + // The socket is now ready to send + self->state |= PAKFIRE_XFER_READY_TO_SEND; + + // Finish sending any partially sent messages + if (self->send_buffer.data) { + r = pakfire_xfer_send_message(self, self->send_buffer.data, self->send_buffer.length); + if (r < 0) + return r; + } + // Just call the callback (if there is one) - if (xfer->callbacks.send) { - r = xfer->callbacks.send(xfer, xfer->callbacks.data); + if (self->callbacks.send) { + r = self->callbacks.send(self, self->callbacks.data); if (r) return r; } @@ -1096,10 +1119,10 @@ static pakfire_xfer_error_code_t pakfire_xfer_done_socket( // Check what callbacks we have if (xfer->callbacks.recv) - events |= EPOLLIN; + events |= EPOLLET|EPOLLIN; if (xfer->callbacks.send) - events |= EPOLLOUT; + events |= EPOLLET|EPOLLOUT; // Register a callback with the event loop r = sd_event_add_io(loop, &xfer->event, socket, events, __pakfire_xfer_socket, xfer); @@ -1633,6 +1656,22 @@ int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback op return 0; } +static int pakfire_xfer_store_message(struct pakfire_xfer* self, + const char* message, const size_t length) { + // Resize the buffer + self->send_buffer.data = pakfire_realloc(self->send_buffer.data, length); + if (!self->send_buffer.data) { + ERROR(self->ctx, "Failed to allocate memory: %m\n"); + return -errno; + } + + // Store the message + memcpy(self->send_buffer.data, message, length); + self->send_buffer.length = length; + + return 0; +} + /* This function sends a WebSocket message */ @@ -1642,6 +1681,10 @@ int pakfire_xfer_send_message(struct pakfire_xfer* xfer, size_t offset = 0; int r; + // XXX We need to protect against sending a new message + // when there is still something in the send buffer + // Should we just send the previous message before we send this one? + // Send the message while (offset < length) { r = curl_ws_send(xfer->handle, message + offset, length - offset, @@ -1655,9 +1698,18 @@ int pakfire_xfer_send_message(struct pakfire_xfer* xfer, case CURLE_OK: return 0; - // We could not send all data, try again + // We could not send all data, store the message + // and wait until the socket is ready case CURLE_AGAIN: - continue; + // We cannot send no more + xfer->state &= ~PAKFIRE_XFER_READY_TO_SEND; + + // Store the message + r = pakfire_xfer_store_message(xfer, message + offset, length - offset); + if (r < 0) + return r; + + return -EAGAIN; default: ERROR(xfer->ctx, "Could not send message: %s\n", curl_easy_strerror(r)); @@ -1668,6 +1720,19 @@ int pakfire_xfer_send_message(struct pakfire_xfer* xfer, return 0; } +int pakfire_xfer_is_ready_to_send(struct pakfire_xfer* self) { + // This function is only supported for sockets + switch (self->direction) { + case PAKFIRE_XFER_SOCKET: + break; + + default: + return -ENOTSUP; + } + + return (self->state & PAKFIRE_XFER_READY_TO_SEND); +} + pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) { int r; diff --git a/src/pakfire/xfer.h b/src/pakfire/xfer.h index 8207a78d..9f67e9a3 100644 --- a/src/pakfire/xfer.h +++ b/src/pakfire/xfer.h @@ -144,5 +144,6 @@ int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback op pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data); int pakfire_xfer_send_message(struct pakfire_xfer* xfer, const char* message, const size_t length); +int pakfire_xfer_is_ready_to_send(struct pakfire_xfer* self); #endif /* PAKFIRE_XFER_H */