]> git.ipfire.org Git - pbs.git/commitdiff
jobs: Build a prototype to stream logs
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 22 Apr 2023 14:43:03 +0000 (14:43 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 22 Apr 2023 14:43:03 +0000 (14:43 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/buildservice/__init__.py
src/buildservice/logstreams.py [new file with mode: 0644]
src/static/css/site.scss
src/static/js/job-log-stream.js [new file with mode: 0644]
src/templates/jobs/log-stream.html [new file with mode: 0644]
src/templates/jobs/modules/list.html
src/templates/jobs/modules/log-stream.html [new file with mode: 0644]
src/web/__init__.py
src/web/jobs.py

index 8ff17559e43b4e587a593aaca6eae90d7a3913ab..6fb713069c1a5441e2a6ed0ec41e71b1ee4997fa 100644 (file)
@@ -98,6 +98,7 @@ buildservice_PYTHON = \
        src/buildservice/jobqueue.py \
        src/buildservice/jobs.py \
        src/buildservice/keys.py \
+       src/buildservice/logstreams.py \
        src/buildservice/messages.py \
        src/buildservice/mirrors.py \
        src/buildservice/misc.py \
@@ -219,7 +220,8 @@ dist_templates_events_modules_DATA = \
 templates_events_modulesdir = $(templates_eventsdir)/modules
 
 dist_templates_jobs_DATA = \
-       src/templates/jobs/abort.html
+       src/templates/jobs/abort.html \
+       src/templates/jobs/log-stream.html
 
 templates_jobsdir = $(templatesdir)/jobs
 
@@ -229,7 +231,8 @@ dist_templates_jobs_messages_DATA = \
 templates_jobs_messagesdir = $(templates_jobsdir)/messages
 
 dist_templates_jobs_modules_DATA = \
-       src/templates/jobs/modules/list.html
+       src/templates/jobs/modules/list.html \
+       src/templates/jobs/modules/log-stream.html
 
 templates_jobs_modulesdir = $(templates_jobsdir)/modules
 
@@ -323,12 +326,17 @@ CLEANFILES += \
 
 static_js_DATA = \
        src/static/js/jquery.min.js \
+       src/static/js/job-log-stream.min.js \
        src/static/js/pbs.js \
        src/static/js/prettify.js
 
 static_jsdir = $(staticdir)/js
 
+EXTRA_DIST += \
+       src/static/js/job-log-stream.js
+
 CLEANFILES += \
+       src/static/js/job-log-stream.min.js \
        src/static/js/jquery.min.js
 
 dist_static_fonts_DATA = \
index 6a2e8326afe1390469c5e123f61b8e00f783003a..044c35441f057217e5c116d337e9d22c2495d469 100644 (file)
@@ -24,6 +24,7 @@ from . import events
 from . import jobqueue
 from . import jobs
 from . import keys
+from . import logstreams
 from . import messages
 from . import mirrors
 from . import packages
@@ -70,6 +71,7 @@ class Backend(object):
                self.events      = events.Events(self)
                self.jobqueue    = jobqueue.JobQueue(self)
                self.keys        = keys.Keys(self)
+               self.logstreams  = logstreams.LogStreams(self)
                self.messages    = messages.Messages(self)
                self.mirrors     = mirrors.Mirrors(self)
                self.packages    = packages.Packages(self)
diff --git a/src/buildservice/logstreams.py b/src/buildservice/logstreams.py
new file mode 100644 (file)
index 0000000..6eb5d28
--- /dev/null
@@ -0,0 +1,147 @@
+###############################################################################
+#                                                                             #
+# Pakfire - The IPFire package management system                              #
+# Copyright (C) 2023 Pakfire development team                                 #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+###############################################################################
+
+import asyncio
+import collections
+import logging
+
+from . import base
+
+# Setup logging
+log = logging.getLogger("pbs.logstreamer")
+
+BUFFER_MAX_SIZE = 256
+
+class LogStreams(base.Object):
+       streams = {}
+
+       def open(self, job):
+               stream = LogStream(self.backend, job)
+
+               # XXX Check that we are not replacing an existing stream for the same job
+
+               # Register the stream
+               self.streams[job] = stream
+
+               return stream
+
+       def _close(self, job):
+               """
+                       Closes the stream for a job
+               """
+               try:
+                       self.streams.remove(job)
+               except ValueError:
+                       return
+
+       async def join(self, job, consumer):
+               """
+                       Joins the stream for the given job
+               """
+               try:
+                       stream = self.streams[job]
+               except KeyError:
+                       return
+
+               # Join the stream
+               await stream.join(consumer)
+
+               return stream
+
+
+class LogStream(base.Object):
+       levels = {
+               logging.DEBUG   : "DEBUG",
+               logging.INFO    : "INFO",
+               logging.WARNING : "WARNING",
+               logging.ERROR   : "ERROR",
+       }
+
+       def init(self, job):
+               self.job = job
+
+               # Buffer for messages
+               self.buffer = collections.deque(maxlen=BUFFER_MAX_SIZE)
+
+               # Consumers
+               self.consumers = []
+
+       def __repr__(self):
+               return "<%s %s>" % (self.__class__.__name__, self.job)
+
+       async def close(self):
+               """
+                       Called to close all connections to consumers
+               """
+               # De-register the stream
+               self.backend.logstreams._close(self)
+
+               # Close all connections to consumers
+               if self.consumers:
+                       asyncio.gather(
+                               *(c.close() for c in self.consumers)
+                       )
+
+       async def join(self, consumer):
+               """
+                       Called when a consumer wants to receive the stream
+               """
+               # Store a reference to the consumer
+               self.consumers.append(consumer)
+
+               # Send all messages in the buffer
+               for message in self.buffer:
+                       await consumer.message(message)
+
+               log.debug("%s has joined the stream for %s" % (consumer, self.job))
+
+       async def leave(self, consumer):
+               """
+                       Called when a consumer wants to leave the stream
+               """
+               try:
+                       self.consumers.remove(consumer)
+               except IndexError:
+                       pass
+
+               log.debug("%s has left the stream for %s" % (consumer, self.job))
+
+       async def message(self, level, message):
+               # Translate the level
+               try:
+                       level = self.levels[level]
+               except KeyError:
+                       level = "UNKNOWN"
+
+               # Queue the message line by line
+               for line in message.splitlines():
+                       # Form a message object that we will send to the consumers
+                       m = {
+                               "level"   : level,
+                               "message" : line,
+                       }
+
+                       # Append the message to the buffer
+                       self.buffer.append(m)
+
+                       # Send the message to all consumers
+                       await asyncio.gather(
+                               *(c.message(m) for c in self.consumers),
+                       )
index 67cf3d1ec0ba4adc38685b80ae36612c0a9cb9b6..a723168f12ae28df0b81c86d5d3f11d37fdb9952 100644 (file)
@@ -33,3 +33,30 @@ $link:                                                       $primary;
 /*
        Custom CSS
 */
+
+.jobs-log-stream {
+       // Use the code font
+       font-family: $family-code;
+
+       // Keep any whitespace
+       white-space: pre;
+
+       p.DEBUG {
+               background-color: $light;
+               color: $light-invert;
+       }
+
+       p.INFO {
+               // Use the default text color
+       }
+
+       p.WARNING {
+               background-color: $warning;
+               color: $warning-invert;
+       }
+
+       p.ERROR {
+               background-color: $danger;
+               color: $danger-invert;
+       }
+}
diff --git a/src/static/js/job-log-stream.js b/src/static/js/job-log-stream.js
new file mode 100644 (file)
index 0000000..f2fb830
--- /dev/null
@@ -0,0 +1,32 @@
+$(".jobs-log-stream").each(function() {
+       // Fetch the UUID of the job
+       const uuid = $(this).data("uuid");
+
+       // Find where we are appending lines to
+       const log = $(this);
+
+       // Make the URL
+       const url = "wss://" + window.location.host + "/api/v1/jobs/" + uuid + "/log/stream";
+
+       // Try to connect to the stream
+       const stream = new WebSocket(url);
+
+       stream.addEventListener("message", (event) => {
+               // Parse message as JSON
+               const data = JSON.parse(event.data);
+
+               console.log("Message from server: ", data);
+
+               // Create a new line
+               var line = $("<p></p>");
+
+               // Set the log level
+               line.addClass(data.level);
+
+               // Set the content
+               line.text(data.message);
+
+               // Append it to the log window
+               log.append(line);
+       });
+});
diff --git a/src/templates/jobs/log-stream.html b/src/templates/jobs/log-stream.html
new file mode 100644 (file)
index 0000000..2f133ba
--- /dev/null
@@ -0,0 +1,24 @@
+{% extends "../base.html" %}
+
+{% block title %}{{ job }} - {{ _("Log") }}{% end block %}
+
+{% block container %}
+       <nav class="breadcrumb" aria-label="breadcrumbs">
+               <ul>
+                       <li>
+                               <a href="/builds">{{ _("Builds") }}</a>
+                       </li>
+                       <li>
+                               <a href="/builds/{{ job.build.uuid }}">{{ job.build }}</a>
+                       </li>
+                       <li>
+                               <a href="#" disabled>{{ job.arch }}</a>
+                       </li>
+                       <li class="is-active">
+                               <a href="#" aria-current="page">{{ _("Log") }}</a>
+                       </li>
+               </ul>
+       </nav>
+
+       {% module JobsLogStream(job) %}
+{% end block %}
index 623f460bf8a3539e7ad800b462291d100db5321e..6e6fe429d3ac8c083ef865bf84ca09f272ed07c5 100644 (file)
 
                        {# Footer with some useful buttons #}
                        <footer class="card-footer">
-                               {% if job.has_log() %}
+                               {% if job.is_running() %}
+                                       <a href="/jobs/{{ job.uuid }}/log" class="card-footer-item">
+                                               {{ _("Watch Log") }}
+                                       </a>
+                               {% elif job.has_log() %}
                                        <a href="/jobs/{{ job.uuid }}/log" class="card-footer-item">
                                                {{ _("Download Log") }}
                                        </a>
diff --git a/src/templates/jobs/modules/log-stream.html b/src/templates/jobs/modules/log-stream.html
new file mode 100644 (file)
index 0000000..b2269c9
--- /dev/null
@@ -0,0 +1 @@
+<div class="jobs-log-stream" data-uuid="{{ job.uuid }}"></div>
index 07b7beecf4548e1bf9f0f65e5dd9eb846eb030c3..f877a791bda26f0e06f66428795e69a087bda424 100644 (file)
@@ -57,6 +57,7 @@ class Application(tornado.web.Application):
 
                                # Jobs
                                "JobsList"           : jobs.ListModule,
+                               "JobsLogStream"      : jobs.LogStreamModule,
 
                                # Packages
                                "PackageInfo"        : packages.InfoModule,
@@ -141,6 +142,8 @@ class Application(tornado.web.Application):
                        (r"/job/([\w]{8}-[\w]{4}-[\w]{4}-[\w]{4}-[\w]{12})/buildroot", jobs.JobBuildrootHandler),
                        (r"/api/v1/jobs/([\w]{8}-[\w]{4}-[\w]{4}-[\w]{4}-[\w]{12})",
                                jobs.APIv1DetailHandler),
+                       (r"/api/v1/jobs/([\w]{8}-[\w]{4}-[\w]{4}-[\w]{4}-[\w]{12})/log/stream",
+                               jobs.APIv1LogStreamHandler),
 
                        # Builders
                        (r"/builders", builders.BuilderListHandler),
index b57a46db10ce509531cce9191e2adc67b9ad7979..774e5a0a4da84e0845451a9eeff90d991eb246b3 100644 (file)
@@ -82,6 +82,13 @@ class APIv1DetailHandler(base.APIMixin, tornado.websocket.WebSocketHandler):
 
                log.debug("Connection opened for %s by %s" % (self.job, self.current_user))
 
+               # Open a new log stream
+               self.logstream = self.backend.logstreams.open(self.job)
+
+       async def on_close(self):
+               # Close the logstream
+               await self.logstream.close()
+
        async def on_message(self, message):
                message = self._decode_json_message(message)
 
@@ -94,7 +101,10 @@ class APIv1DetailHandler(base.APIMixin, tornado.websocket.WebSocketHandler):
 
                # Handle log messages
                elif t == "log":
-                       pass
+                       await self.logstream.message(
+                               level=message.get("level"),
+                               message=message.get("log"),
+                       )
 
                # Handle finished message
                elif t == "finished":
@@ -124,6 +134,39 @@ class APIv1DetailHandler(base.APIMixin, tornado.websocket.WebSocketHandler):
                await self.backend.jobqueue.dispatch_jobs()
 
 
+class APIv1LogStreamHandler(base.BackendMixin, tornado.websocket.WebSocketHandler):
+       # No authentication required
+       async def open(self, uuid):
+               job = self.backend.jobs.get_by_uuid(uuid)
+               if not job:
+                       raise tornado.web.HTTPError(404, "Could not find job %s" % uuid)
+
+               # Join the stream
+               self.stream = await self.backend.logstreams.join(job, self)
+               if not self.stream:
+                       raise tornado.web.HTTPError(400, "Could not join stream for %s" % job)
+
+               # Send messages without any delay
+               self.set_nodelay(True)
+
+       async def on_close(self):
+               """
+                       Leave the stream
+               """
+               await self.stream.leave(self)
+
+       async def message(self, message):
+               """
+                       Called when there is a new message to be sent to the client
+               """
+               try:
+                       await self.write_message(message)
+
+               # Ignore if the message could not be sent
+               except tornado.websocket.WebSocketClosedError as e:
+                       pass
+
+
 class QueueHandler(base.BaseHandler):
        def get(self):
                self.render("queue.html", queue=self.backend.jobqueue)
@@ -135,6 +178,11 @@ class LogHandler(base.BaseHandler):
                if not job:
                        raise tornado.web.HTTPError("Could not find job %s" % uuid)
 
+               # Stream the log if the job is running
+               if job.is_running():
+                       self.render("jobs/log-stream.html", job=job)
+                       return
+
                tail = self.get_argument_int("tail", None)
 
                # Should we tail the log, or stream the entire file?
@@ -208,3 +256,13 @@ class ListModule(ui_modules.UIModule):
        def render(self, jobs, show_arch_only=False, show_packages=False):
                return self.render_string("jobs/modules/list.html", jobs=jobs,
                        show_arch_only=show_arch_only, show_packages=show_packages)
+
+
+class LogStreamModule(ui_modules.UIModule):
+       def render(self, job):
+               return self.render_string("jobs/modules/log-stream.html", job=job)
+
+       def javascript_files(self):
+               return [
+                       "js/job-log-stream.min.js",
+               ]