return r;
}
+/*
+ Called when we are ready to stream the log.
+
+ This function calls each running job which will only send one line
+ at a time to ensure that one job will not take the entire bandwidth.
+*/
+int pakfire_daemon_stream_logs(struct pakfire_daemon* self) {
+ unsigned int lines;
+ int r;
+
+ // Bail if we don't have a control connection
+ if (!self->control)
+ return 0;
+
+ // Bail if the connection isn't ready to send
+ else if (!pakfire_xfer_is_ready_to_send(self->control))
+ return 0;
+
+ do {
+ // Reset lines
+ lines = 0;
+
+ // Have every job send one line
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ // Skip any empty slots
+ if (!self->jobs[i])
+ continue;
+
+ // Stream logs
+ r = pakfire_job_stream_logs(self->jobs[i]);
+ if (r < 0) {
+ switch (-r) {
+ case EAGAIN:
+ return 0;
+
+ default:
+ return r;
+ }
+ }
+
+ // Add up lines
+ lines += r;
+ }
+ } while (lines);
+
+ return 0;
+}
+
+static int pakfire_daemon_send(struct pakfire_xfer* xfer, void* data) {
+ struct pakfire_daemon* self = data;
+ int r;
+
+ // Stream logs
+ r = pakfire_daemon_stream_logs(self);
+ if (r < 0)
+ return r;
+
+ return 0;
+}
+
/*
This function is called whenever the connection to the build service could not
be established or was interrupted. It will try to reconnect.
daemon->stats_timer = NULL;
}
- // Turn off log streaming for all jobs
- for (int i = 0; i < MAX_JOBS; i++) {
- if (daemon->jobs[i])
- pakfire_job_terminate_log_stream(daemon->jobs[i]);
- }
-
return 0;
}
// Make this a WebSocket connection
r = pakfire_xfer_socket(xfer, pakfire_daemon_connected,
- pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon);
+ pakfire_daemon_recv, pakfire_daemon_send, pakfire_daemon_close, daemon);
if (r)
goto ERROR;
// Send the message
r = pakfire_xfer_send_message(self->control, m, length);
if (r < 0) {
- ERROR(self->ctx, "Failed to send message: %s\n", strerror(-r));
- return r;
+ switch (-r) {
+ case EAGAIN:
+ break;
+
+ default:
+ ERROR(self->ctx, "Failed to send message: %s\n", strerror(-r));
+ return r;
+ }
}
return 0;
int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job);
+// Stream Logs
+int pakfire_daemon_stream_logs(struct pakfire_daemon* self);
+
// Send message
int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* message);
// Flags
enum {
- PAKFIRE_JOB_TEST = (1 << 0),
- PAKFIRE_JOB_CCACHE = (1 << 1),
+ PAKFIRE_JOB_TEST = (1 << 0),
+ PAKFIRE_JOB_CCACHE = (1 << 1),
+ PAKFIRE_JOB_STREAM_LOG = (1 << 2),
} flags;
// Package URL
char** uploads;
};
+static int pakfire_job_has_flag(struct pakfire_job* self, int flag) {
+ return (self->flags & flag);
+}
+
static int pakfire_parse_job(struct pakfire_job* job, json_object* data) {
json_object* ccache = NULL;
json_object* o = NULL;
}
static int pakfire_job_send_log(struct pakfire_job* job, int priority, const char* line, size_t length) {
- return pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length);
+ int r;
+
+ // Enqueue the line into the buffer
+ r = pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length);
+ if (r < 0)
+ return r;
+
+ // Ask the daemon to send it
+ return pakfire_daemon_stream_logs(job->daemon);
}
static int pakfire_job_stdout(struct pakfire_log_stream* stream,
return r;
}
-static int pakfire_job_stream(sd_event_source* s, void* data) {
- struct pakfire_job* job = data;
+int pakfire_job_stream_logs(struct pakfire_job* self) {
struct timeval timestamp = {};
char* line = NULL;
size_t length = 0;
int priority;
int r;
- // Send as many log messages as possible
- for (;;) {
- // Try to dequeue a line from the log buffer
- r = pakfire_log_buffer_dequeue(job->log.buffer, ×tamp, &priority, &line, &length);
- if (r < 0) {
- ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
- return r;
- }
-
- // We currently have no data and want to be called again later
- if (!line)
- break;
+ // Don't do this if not enabled
+ if (!pakfire_job_has_flag(self, PAKFIRE_JOB_STREAM_LOG))
+ return 0;
- // If we have received a line let's send it
- r = pakfire_job_send_log_line(job, ×tamp, priority, line, length);
- free(line);
- if (r < 0)
- return r;
+ // Try to dequeue a line from the log buffer
+ r = pakfire_log_buffer_dequeue(self->log.buffer, ×tamp, &priority, &line, &length);
+ if (r < 0) {
+ ERROR(self->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
+ return r;
}
- return 0;
-}
+ // Done if there is no data
+ if (!line)
+ return 0;
-int pakfire_job_launch_log_stream(struct pakfire_job* job) {
- int r;
+ // If we have received a line let's send it
+ r = pakfire_job_send_log_line(self, ×tamp, priority, line, length);
- // Make sure the previous event is actually gone
- pakfire_job_terminate_log_stream(job);
+ // Cleanup
+ free(line);
- // Create the log stream event
- r = sd_event_add_post(job->loop, &job->log.stream, pakfire_job_stream, job);
- if (r < 0) {
- ERROR(job->ctx, "Could not register the log stream: %s\n", strerror(-r));
+ // Raise any errors
+ if (r < 0)
return r;
- }
- return 0;
-}
-
-int pakfire_job_terminate_log_stream(struct pakfire_job* job) {
- // Disable log streaming
- if (job->log.stream) {
- sd_event_source_unref(job->log.stream);
- job->log.stream = NULL;
- }
-
- return 0;
+ // Otherwise return the number of lines sent
+ return 1;
}
int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
// Launch log stream
if (pakfire_string_equals(command, "launch-log-stream"))
- return pakfire_job_launch_log_stream(self);
+ self->flags |= PAKFIRE_JOB_STREAM_LOG;
// Terminate log stream
else if (pakfire_string_equals(command, "terminate-log-stream"))
- return pakfire_job_terminate_log_stream(self);
+ self->flags &= ~PAKFIRE_JOB_STREAM_LOG;
// Abort job
if (pakfire_string_equals(command, "abort"))
int pakfire_job_terminate(struct pakfire_job* worker, int signal);
// Log Stream
-int pakfire_job_launch_log_stream(struct pakfire_job* job);
-int pakfire_job_terminate_log_stream(struct pakfire_job* job);
+int pakfire_job_stream_logs(struct pakfire_job* self);
// Message Received
int pakfire_job_handle_message(struct pakfire_job* self, struct json_object* message);
char* data;
size_t length;
} recv_buffer;
+
+ // WebSocket Send Buffer
+ struct pakfire_send_buffer {
+ char* data;
+ size_t length;
+ } send_buffer;
+
+ // State
+ enum pakfire_xfer_state {
+ PAKFIRE_XFER_READY_TO_SEND = (1 << 0),
+ } state;
};
static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
// Receive Buffer
if (xfer->recv_buffer.data)
free(xfer->recv_buffer.data);
+ if (xfer->send_buffer.data)
+ free(xfer->send_buffer.data);
// Query Arguments
if (xfer->queries)
return 0;
}
-static int pakfire_xfer_socket_send(struct pakfire_xfer* xfer) {
+static int pakfire_xfer_socket_send(struct pakfire_xfer* self) {
int r;
+ // The socket is now ready to send
+ self->state |= PAKFIRE_XFER_READY_TO_SEND;
+
+ // Finish sending any partially sent messages
+ if (self->send_buffer.data) {
+ r = pakfire_xfer_send_message(self, self->send_buffer.data, self->send_buffer.length);
+ if (r < 0)
+ return r;
+ }
+
// Just call the callback (if there is one)
- if (xfer->callbacks.send) {
- r = xfer->callbacks.send(xfer, xfer->callbacks.data);
+ if (self->callbacks.send) {
+ r = self->callbacks.send(self, self->callbacks.data);
if (r)
return r;
}
// Check what callbacks we have
if (xfer->callbacks.recv)
- events |= EPOLLIN;
+ events |= EPOLLET|EPOLLIN;
if (xfer->callbacks.send)
- events |= EPOLLOUT;
+ events |= EPOLLET|EPOLLOUT;
// Register a callback with the event loop
r = sd_event_add_io(loop, &xfer->event, socket, events, __pakfire_xfer_socket, xfer);
return 0;
}
+static int pakfire_xfer_store_message(struct pakfire_xfer* self,
+ const char* message, const size_t length) {
+ // Resize the buffer
+ self->send_buffer.data = pakfire_realloc(self->send_buffer.data, length);
+ if (!self->send_buffer.data) {
+ ERROR(self->ctx, "Failed to allocate memory: %m\n");
+ return -errno;
+ }
+
+ // Store the message
+ memcpy(self->send_buffer.data, message, length);
+ self->send_buffer.length = length;
+
+ return 0;
+}
+
/*
This function sends a WebSocket message
*/
size_t offset = 0;
int r;
+ // XXX We need to protect against sending a new message
+ // when there is still something in the send buffer
+ // Should we just send the previous message before we send this one?
+
// Send the message
while (offset < length) {
r = curl_ws_send(xfer->handle, message + offset, length - offset,
case CURLE_OK:
return 0;
- // We could not send all data, try again
+ // We could not send all data, store the message
+ // and wait until the socket is ready
case CURLE_AGAIN:
- continue;
+ // We cannot send no more
+ xfer->state &= ~PAKFIRE_XFER_READY_TO_SEND;
+
+ // Store the message
+ r = pakfire_xfer_store_message(xfer, message + offset, length - offset);
+ if (r < 0)
+ return r;
+
+ return -EAGAIN;
default:
ERROR(xfer->ctx, "Could not send message: %s\n", curl_easy_strerror(r));
return 0;
}
+int pakfire_xfer_is_ready_to_send(struct pakfire_xfer* self) {
+ // This function is only supported for sockets
+ switch (self->direction) {
+ case PAKFIRE_XFER_SOCKET:
+ break;
+
+ default:
+ return -ENOTSUP;
+ }
+
+ return (self->state & PAKFIRE_XFER_READY_TO_SEND);
+}
+
pakfire_xfer_error_code_t pakfire_xfer_run(struct pakfire_xfer* xfer, int flags) {
int r;
pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data);
int pakfire_xfer_send_message(struct pakfire_xfer* xfer, const char* message, const size_t length);
+int pakfire_xfer_is_ready_to_send(struct pakfire_xfer* self);
#endif /* PAKFIRE_XFER_H */