// Streams
struct pakfire_log_stream* stdout;
struct pakfire_log_stream* stderr;
+
+ // Stream Event
+ sd_event_source* stream;
} log;
// Connection Timer and Holdoff Time
pakfire_log_stream_unref(job->log.stdout);
if (job->log.stderr)
pakfire_log_stream_unref(job->log.stderr);
+ if (job->log.stream)
+ sd_event_source_unref(job->log.stream);
if (job->service)
pakfire_buildservice_unref(job->service);
return r;
}
-static int pakfire_job_send(struct pakfire_xfer* xfer, void* data) {
+static int pakfire_job_stream(sd_event_source* s, void* data) {
struct pakfire_job* job = data;
char* line = NULL;
size_t length = 0;
int priority;
int r;
- // Try to dequeue a line from the log buffer
- r = pakfire_log_buffer_dequeue(job->log.buffer, &priority, &line, &length);
- if (r < 0) {
- ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
- goto ERROR;
+ // Do nothing if we are not connected
+ if (!job->control)
+ return 0;
+
+ // Send as many log messages as possible
+ for (;;) {
+ // Try to dequeue a line from the log buffer
+ r = pakfire_log_buffer_dequeue(job->log.buffer, &priority, &line, &length);
+ if (r < 0) {
+ ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
+ return r;
+ }
+
+ // We currently have no data and want to be called again later
+ if (!line)
+ break;
+
+ // If we have received a line let's send it
+ r = pakfire_job_send_log_line(job, priority, line, length);
+ free(line);
+ if (r < 0)
+ return r;
}
- // We currently have no data and want to be called again later
- if (!line)
- return -EAGAIN;
+ return 0;
+}
- // If we have received a line let's send it
- r = pakfire_job_send_log_line(job, priority, line, length);
- if (r < 0)
- goto ERROR;
+static int pakfire_job_terminate_log_stream(struct pakfire_job* job) {
+ // Disable log streaming
+ if (job->log.stream) {
+ sd_event_source_unref(job->log.stream);
+ job->log.stream = NULL;
+ }
-ERROR:
- if (line)
- free(line);
+ return 0;
+}
- return r;
+static int pakfire_job_launch_log_stream(struct pakfire_job* job) {
+ int r;
+
+ // Make sure the previous event is actually gone
+ pakfire_job_terminate_log_stream(job);
+
+ // Create the log stream event
+ r = sd_event_add_post(job->loop, &job->log.stream, pakfire_job_stream, job);
+ if (r < 0) {
+ ERROR(job->ctx, "Could not register the log stream: %s\n", strerror(-r));
+ return r;
+ }
+
+ return 0;
}
static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) {
job->control = NULL;
}
+ // Turn off the log stream
+ pakfire_job_terminate_log_stream(job);
+
INFO(job->ctx, "Will attempt to reconnect in %u second(s)\n",
job->reconnect_holdoff / 1000000);
static int pakfire_job_connected(struct pakfire_xfer* xfer, void* data) {
struct pakfire_job* job = data;
+ int r;
DEBUG(job->ctx, "Connected!\n");
// Store a reference to the control connection
job->control = pakfire_xfer_ref(xfer);
+ // Launch the log stream
+ r = pakfire_job_launch_log_stream(job);
+ if (r < 0)
+ return r;
+
switch (job->state) {
// If the job has connected for the first time, we launch it
case PAKFIRE_JOB_STATE_INIT:
// Make this a WebSocket connection
r = pakfire_xfer_socket(xfer, pakfire_job_connected,
- pakfire_job_recv, pakfire_job_send, pakfire_job_closed, job);
+ pakfire_job_recv, NULL, pakfire_job_closed, job);
if (r)
goto ERROR;