return 0;
}
+static int pakfire_job_send_log_line(struct pakfire_job* job,
+ int priority, const char* line, size_t length) {
+ struct json_object* message = NULL;
+ struct json_object* data = NULL;
+ int r;
+
+ // Create a new JSON object
+ data = json_object_new_object();
+ if (!data) {
+ ERROR(job->ctx, "Could not create a new JSON object: %m\n");
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Add the priority
+ r = pakfire_json_add_uint64(data, "priority", priority);
+ if (r)
+ goto ERROR;
+
+ // Add the line
+ r = pakfire_json_add_stringn(data, "line", line, length);
+ if (r)
+ goto ERROR;
+
+ // Create a new stats object
+ message = json_object_new_object();
+ if (!message) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Set type
+ r = pakfire_json_add_string(message, "type", "log");
+ if (r)
+ goto ERROR;
+
+ // Set data
+ r = json_object_object_add(message, "data", json_object_get(data));
+ if (r)
+ goto ERROR;
+
+ // Serialize to string
+ const char* m = json_object_to_json_string_length(message,
+ JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY, &length);
+
+ // Send the message
+ r = pakfire_xfer_send_message(job->control, m, length);
+ if (r) {
+ ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ERROR:
+ if (message)
+ json_object_put(message);
+ if (data)
+ json_object_put(data);
+
+ return r;
+}
+
+static int pakfire_job_send(struct pakfire_xfer* xfer, void* data) {
+ struct pakfire_job* job = data;
+ char* line = NULL;
+ size_t length = 0;
+ int priority;
+ int r;
+
+ // Try to dequeue a line from the log buffer
+ r = pakfire_log_buffer_dequeue(job->log.buffer, &priority, &line, &length);
+ if (r < 0) {
+ ERROR(job->ctx, "Could not dequeue from the log buffer: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // We currently have no data and want to be called again later
+ if (!line)
+ return -EAGAIN;
+
+ // If we have received a line let's send it
+ r = pakfire_job_send_log_line(job, priority, line, length);
+ if (r < 0)
+ goto ERROR;
+
+ERROR:
+ if (line)
+ free(line);
+
+ return r;
+}
+
static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) {
struct pakfire_job* job = data;
int r;
// Make this a WebSocket connection
r = pakfire_xfer_socket(xfer, pakfire_job_connected,
- pakfire_job_recv, NULL, pakfire_job_closed, job);
+ pakfire_job_recv, pakfire_job_send, pakfire_job_closed, job);
if (r)
goto ERROR;