From: Michael Tremer Date: Sun, 26 Jan 2025 16:58:03 +0000 (+0000) Subject: job: Build some experimental function to stream the log to the build service X-Git-Tag: 0.9.30~364 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6ae40893135da1b331f5fe4f474bfdfbc2592215;p=pakfire.git job: Build some experimental function to stream the log to the build service This does not seem to be called often enough. If, however, we don't send -EAGAIN we will be called as fast as possible starving the loop. Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/job.c b/src/pakfire/job.c index a787804b..0b98f274 100644 --- a/src/pakfire/job.c +++ b/src/pakfire/job.c @@ -593,6 +593,97 @@ static int pakfire_job_recv(struct pakfire_xfer* xfer, return 0; } +static int pakfire_job_send_log_line(struct pakfire_job* job, + int priority, const char* line, size_t length) { + struct json_object* message = NULL; + struct json_object* data = NULL; + int r; + + // Create a new JSON object + data = json_object_new_object(); + if (!data) { + ERROR(job->ctx, "Could not create a new JSON object: %m\n"); + r = -errno; + goto ERROR; + } + + // Add the priority + r = pakfire_json_add_uint64(data, "priority", priority); + if (r) + goto ERROR; + + // Add the line + r = pakfire_json_add_stringn(data, "line", line, length); + if (r) + goto ERROR; + + // Create a new stats object + message = json_object_new_object(); + if (!message) { + r = -errno; + goto ERROR; + } + + // Set type + r = pakfire_json_add_string(message, "type", "log"); + if (r) + goto ERROR; + + // Set data + r = json_object_object_add(message, "data", json_object_get(data)); + if (r) + goto ERROR; + + // Serialize to string + const char* m = json_object_to_json_string_length(message, + JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY, &length); + + // Send the message + r = pakfire_xfer_send_message(job->control, m, length); + if (r) { + ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r)); + goto ERROR; + } + +ERROR: + if (message) + json_object_put(message); + if (data) + json_object_put(data); + + return r; +} + +static int pakfire_job_send(struct pakfire_xfer* xfer, void* data) { + struct pakfire_job* job = data; + char* line = NULL; + size_t length = 0; + int priority; + int r; + + // Try to dequeue a line from the log buffer + r = pakfire_log_buffer_dequeue(job->log.buffer, &priority, &line, &length); + if (r < 0) { + ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r)); + goto ERROR; + } + + // We currently have no data and want to be called again later + if (!line) + return -EAGAIN; + + // If we have received a line let's send it + r = pakfire_job_send_log_line(job, priority, line, length); + if (r < 0) + goto ERROR; + +ERROR: + if (line) + free(line); + + return r; +} + static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) { struct pakfire_job* job = data; int r; @@ -682,7 +773,7 @@ static int pakfire_job_connect(sd_event_source* s, uint64_t usec, void* data) { // Make this a WebSocket connection r = pakfire_xfer_socket(xfer, pakfire_job_connected, - pakfire_job_recv, NULL, pakfire_job_closed, job); + pakfire_job_recv, pakfire_job_send, pakfire_job_closed, job); if (r) goto ERROR;