src/buildservice/jobqueue.py \
src/buildservice/jobs.py \
src/buildservice/keys.py \
+ src/buildservice/logstreams.py \
src/buildservice/messages.py \
src/buildservice/mirrors.py \
src/buildservice/misc.py \
templates_events_modulesdir = $(templates_eventsdir)/modules
dist_templates_jobs_DATA = \
- src/templates/jobs/abort.html
+ src/templates/jobs/abort.html \
+ src/templates/jobs/log-stream.html
templates_jobsdir = $(templatesdir)/jobs
templates_jobs_messagesdir = $(templates_jobsdir)/messages
dist_templates_jobs_modules_DATA = \
- src/templates/jobs/modules/list.html
+ src/templates/jobs/modules/list.html \
+ src/templates/jobs/modules/log-stream.html
templates_jobs_modulesdir = $(templates_jobsdir)/modules
static_js_DATA = \
src/static/js/jquery.min.js \
+ src/static/js/job-log-stream.min.js \
src/static/js/pbs.js \
src/static/js/prettify.js
static_jsdir = $(staticdir)/js
+EXTRA_DIST += \
+ src/static/js/job-log-stream.js
+
CLEANFILES += \
+ src/static/js/job-log-stream.min.js \
src/static/js/jquery.min.js
dist_static_fonts_DATA = \
from . import jobqueue
from . import jobs
from . import keys
+from . import logstreams
from . import messages
from . import mirrors
from . import packages
self.events = events.Events(self)
self.jobqueue = jobqueue.JobQueue(self)
self.keys = keys.Keys(self)
+ self.logstreams = logstreams.LogStreams(self)
self.messages = messages.Messages(self)
self.mirrors = mirrors.Mirrors(self)
self.packages = packages.Packages(self)
--- /dev/null
+###############################################################################
+# #
+# Pakfire - The IPFire package management system #
+# Copyright (C) 2023 Pakfire development team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+###############################################################################
+
+import asyncio
+import collections
+import logging
+
+from . import base
+
+# Setup logging
+log = logging.getLogger("pbs.logstreamer")
+
+BUFFER_MAX_SIZE = 256
+
+class LogStreams(base.Object):
+ streams = {}
+
+ def open(self, job):
+ stream = LogStream(self.backend, job)
+
+ # XXX Check that we are not replacing an existing stream for the same job
+
+ # Register the stream
+ self.streams[job] = stream
+
+ return stream
+
+ def _close(self, job):
+ """
+ Closes the stream for a job
+ """
+ try:
+ self.streams.remove(job)
+ except ValueError:
+ return
+
+ async def join(self, job, consumer):
+ """
+ Joins the stream for the given job
+ """
+ try:
+ stream = self.streams[job]
+ except KeyError:
+ return
+
+ # Join the stream
+ await stream.join(consumer)
+
+ return stream
+
+
+class LogStream(base.Object):
+ levels = {
+ logging.DEBUG : "DEBUG",
+ logging.INFO : "INFO",
+ logging.WARNING : "WARNING",
+ logging.ERROR : "ERROR",
+ }
+
+ def init(self, job):
+ self.job = job
+
+ # Buffer for messages
+ self.buffer = collections.deque(maxlen=BUFFER_MAX_SIZE)
+
+ # Consumers
+ self.consumers = []
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.job)
+
+ async def close(self):
+ """
+ Called to close all connections to consumers
+ """
+ # De-register the stream
+ self.backend.logstreams._close(self)
+
+ # Close all connections to consumers
+ if self.consumers:
+ asyncio.gather(
+ *(c.close() for c in self.consumers)
+ )
+
+ async def join(self, consumer):
+ """
+ Called when a consumer wants to receive the stream
+ """
+ # Store a reference to the consumer
+ self.consumers.append(consumer)
+
+ # Send all messages in the buffer
+ for message in self.buffer:
+ await consumer.message(message)
+
+ log.debug("%s has joined the stream for %s" % (consumer, self.job))
+
+ async def leave(self, consumer):
+ """
+ Called when a consumer wants to leave the stream
+ """
+ try:
+ self.consumers.remove(consumer)
+ except IndexError:
+ pass
+
+ log.debug("%s has left the stream for %s" % (consumer, self.job))
+
+ async def message(self, level, message):
+ # Translate the level
+ try:
+ level = self.levels[level]
+ except KeyError:
+ level = "UNKNOWN"
+
+ # Queue the message line by line
+ for line in message.splitlines():
+ # Form a message object that we will send to the consumers
+ m = {
+ "level" : level,
+ "message" : line,
+ }
+
+ # Append the message to the buffer
+ self.buffer.append(m)
+
+ # Send the message to all consumers
+ await asyncio.gather(
+ *(c.message(m) for c in self.consumers),
+ )
/*
Custom CSS
*/
+
+.jobs-log-stream {
+ // Use the code font
+ font-family: $family-code;
+
+ // Keep any whitespace
+ white-space: pre;
+
+ p.DEBUG {
+ background-color: $light;
+ color: $light-invert;
+ }
+
+ p.INFO {
+ // Use the default text color
+ }
+
+ p.WARNING {
+ background-color: $warning;
+ color: $warning-invert;
+ }
+
+ p.ERROR {
+ background-color: $danger;
+ color: $danger-invert;
+ }
+}
--- /dev/null
+$(".jobs-log-stream").each(function() {
+ // Fetch the UUID of the job
+ const uuid = $(this).data("uuid");
+
+ // Find where we are appending lines to
+ const log = $(this);
+
+ // Make the URL
+ const url = "wss://" + window.location.host + "/api/v1/jobs/" + uuid + "/log/stream";
+
+ // Try to connect to the stream
+ const stream = new WebSocket(url);
+
+ stream.addEventListener("message", (event) => {
+ // Parse message as JSON
+ const data = JSON.parse(event.data);
+
+ console.log("Message from server: ", data);
+
+ // Create a new line
+ var line = $("<p></p>");
+
+ // Set the log level
+ line.addClass(data.level);
+
+ // Set the content
+ line.text(data.message);
+
+ // Append it to the log window
+ log.append(line);
+ });
+});
--- /dev/null
+{% extends "../base.html" %}
+
+{% block title %}{{ job }} - {{ _("Log") }}{% end block %}
+
+{% block container %}
+ <nav class="breadcrumb" aria-label="breadcrumbs">
+ <ul>
+ <li>
+ <a href="/builds">{{ _("Builds") }}</a>
+ </li>
+ <li>
+ <a href="/builds/{{ job.build.uuid }}">{{ job.build }}</a>
+ </li>
+ <li>
+ <a href="#" disabled>{{ job.arch }}</a>
+ </li>
+ <li class="is-active">
+ <a href="#" aria-current="page">{{ _("Log") }}</a>
+ </li>
+ </ul>
+ </nav>
+
+ {% module JobsLogStream(job) %}
+{% end block %}
{# Footer with some useful buttons #}
<footer class="card-footer">
- {% if job.has_log() %}
+ {% if job.is_running() %}
+ <a href="/jobs/{{ job.uuid }}/log" class="card-footer-item">
+ {{ _("Watch Log") }}
+ </a>
+ {% elif job.has_log() %}
<a href="/jobs/{{ job.uuid }}/log" class="card-footer-item">
{{ _("Download Log") }}
</a>
--- /dev/null
+<div class="jobs-log-stream" data-uuid="{{ job.uuid }}"></div>
# Jobs
"JobsList" : jobs.ListModule,
+ "JobsLogStream" : jobs.LogStreamModule,
# Packages
"PackageInfo" : packages.InfoModule,
(r"/job/([\w]{8}-[\w]{4}-[\w]{4}-[\w]{4}-[\w]{12})/buildroot", jobs.JobBuildrootHandler),
(r"/api/v1/jobs/([\w]{8}-[\w]{4}-[\w]{4}-[\w]{4}-[\w]{12})",
jobs.APIv1DetailHandler),
+ (r"/api/v1/jobs/([\w]{8}-[\w]{4}-[\w]{4}-[\w]{4}-[\w]{12})/log/stream",
+ jobs.APIv1LogStreamHandler),
# Builders
(r"/builders", builders.BuilderListHandler),
log.debug("Connection opened for %s by %s" % (self.job, self.current_user))
+ # Open a new log stream
+ self.logstream = self.backend.logstreams.open(self.job)
+
+ async def on_close(self):
+ # Close the logstream
+ await self.logstream.close()
+
async def on_message(self, message):
message = self._decode_json_message(message)
# Handle log messages
elif t == "log":
- pass
+ await self.logstream.message(
+ level=message.get("level"),
+ message=message.get("log"),
+ )
# Handle finished message
elif t == "finished":
await self.backend.jobqueue.dispatch_jobs()
+class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler):
+ # No authentication required
+ async def open(self, uuid):
+ job = self.backend.jobs.get_by_uuid(uuid)
+ if not job:
+ raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
+
+ # Join the stream
+ self.stream = await self.backend.logstreams.join(job, self)
+ if not self.stream:
+ raise tornado.web.HTTPError(400, "Could not join stream for %s" % job)
+
+ # Send messages without any delay
+ self.set_nodelay(True)
+
+ async def on_close(self):
+ """
+ Leave the stream
+ """
+ await self.stream.leave(self)
+
+ async def message(self, message):
+ """
+ Called when there is a new message to be sent to the client
+ """
+ try:
+ await self.write_message(message)
+
+ # Ignore if the message could not be sent
+ except tornado.websocket.WebSocketClosedError as e:
+ pass
+
+
class QueueHandler(base.BaseHandler):
def get(self):
self.render("queue.html", queue=self.backend.jobqueue)
if not job:
raise tornado.web.HTTPError("Could not find job %s" % uuid)
+ # Stream the log if the job is running
+ if job.is_running():
+ self.render("jobs/log-stream.html", job=job)
+ return
+
tail = self.get_argument_int("tail", None)
# Should we tail the log, or stream the entire file?
def render(self, jobs, show_arch_only=False, show_packages=False):
return self.render_string("jobs/modules/list.html", jobs=jobs,
show_arch_only=show_arch_only, show_packages=show_packages)
+
+
+class LogStreamModule(ui_modules.UIModule):
+ def render(self, job):
+ return self.render_string("jobs/modules/log-stream.html", job=job)
+
+ def javascript_files(self):
+ return [
+ "js/job-log-stream.min.js",
+ ]