From: Michael Tremer Date: Sat, 22 Apr 2023 14:43:03 +0000 (+0000) Subject: jobs: Build a prototype to stream logs X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cd142e5276525eabc57e99cace6c6003ea3f1efa;p=pbs.git jobs: Build a prototype to stream logs Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index 8ff17559..6fb71306 100644 --- a/Makefile.am +++ b/Makefile.am @@ -98,6 +98,7 @@ buildservice_PYTHON = \ src/buildservice/jobqueue.py \ src/buildservice/jobs.py \ src/buildservice/keys.py \ + src/buildservice/logstreams.py \ src/buildservice/messages.py \ src/buildservice/mirrors.py \ src/buildservice/misc.py \ @@ -219,7 +220,8 @@ dist_templates_events_modules_DATA = \ templates_events_modulesdir = $(templates_eventsdir)/modules dist_templates_jobs_DATA = \ - src/templates/jobs/abort.html + src/templates/jobs/abort.html \ + src/templates/jobs/log-stream.html templates_jobsdir = $(templatesdir)/jobs @@ -229,7 +231,8 @@ dist_templates_jobs_messages_DATA = \ templates_jobs_messagesdir = $(templates_jobsdir)/messages dist_templates_jobs_modules_DATA = \ - src/templates/jobs/modules/list.html + src/templates/jobs/modules/list.html \ + src/templates/jobs/modules/log-stream.html templates_jobs_modulesdir = $(templates_jobsdir)/modules @@ -323,12 +326,17 @@ CLEANFILES += \ static_js_DATA = \ src/static/js/jquery.min.js \ + src/static/js/job-log-stream.min.js \ src/static/js/pbs.js \ src/static/js/prettify.js static_jsdir = $(staticdir)/js +EXTRA_DIST += \ + src/static/js/job-log-stream.js + CLEANFILES += \ + src/static/js/job-log-stream.min.js \ src/static/js/jquery.min.js dist_static_fonts_DATA = \ diff --git a/src/buildservice/__init__.py b/src/buildservice/__init__.py index 6a2e8326..044c3544 100644 --- a/src/buildservice/__init__.py +++ b/src/buildservice/__init__.py @@ -24,6 +24,7 @@ from . import events from . import jobqueue from . import jobs from . import keys +from . import logstreams from . import messages from . import mirrors from . import packages @@ -70,6 +71,7 @@ class Backend(object): self.events = events.Events(self) self.jobqueue = jobqueue.JobQueue(self) self.keys = keys.Keys(self) + self.logstreams = logstreams.LogStreams(self) self.messages = messages.Messages(self) self.mirrors = mirrors.Mirrors(self) self.packages = packages.Packages(self) diff --git a/src/buildservice/logstreams.py b/src/buildservice/logstreams.py new file mode 100644 index 00000000..6eb5d289 --- /dev/null +++ b/src/buildservice/logstreams.py @@ -0,0 +1,147 @@ +############################################################################### +# # +# Pakfire - The IPFire package management system # +# Copyright (C) 2023 Pakfire development team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +############################################################################### + +import asyncio +import collections +import logging + +from . import base + +# Setup logging +log = logging.getLogger("pbs.logstreamer") + +BUFFER_MAX_SIZE = 256 + +class LogStreams(base.Object): + streams = {} + + def open(self, job): + stream = LogStream(self.backend, job) + + # XXX Check that we are not replacing an existing stream for the same job + + # Register the stream + self.streams[job] = stream + + return stream + + def _close(self, job): + """ + Closes the stream for a job + """ + try: + self.streams.remove(job) + except ValueError: + return + + async def join(self, job, consumer): + """ + Joins the stream for the given job + """ + try: + stream = self.streams[job] + except KeyError: + return + + # Join the stream + await stream.join(consumer) + + return stream + + +class LogStream(base.Object): + levels = { + logging.DEBUG : "DEBUG", + logging.INFO : "INFO", + logging.WARNING : "WARNING", + logging.ERROR : "ERROR", + } + + def init(self, job): + self.job = job + + # Buffer for messages + self.buffer = collections.deque(maxlen=BUFFER_MAX_SIZE) + + # Consumers + self.consumers = [] + + def __repr__(self): + return "<%s %s>" % (self.__class__.__name__, self.job) + + async def close(self): + """ + Called to close all connections to consumers + """ + # De-register the stream + self.backend.logstreams._close(self) + + # Close all connections to consumers + if self.consumers: + asyncio.gather( + *(c.close() for c in self.consumers) + ) + + async def join(self, consumer): + """ + Called when a consumer wants to receive the stream + """ + # Store a reference to the consumer + self.consumers.append(consumer) + + # Send all messages in the buffer + for message in self.buffer: + await consumer.message(message) + + log.debug("%s has joined the stream for %s" % (consumer, self.job)) + + async def leave(self, consumer): + """ + Called when a consumer wants to leave the stream + """ + try: + self.consumers.remove(consumer) + except IndexError: + pass + + log.debug("%s has left the stream for %s" % (consumer, self.job)) + + async def message(self, level, message): + # Translate the level + try: + level = self.levels[level] + except KeyError: + level = "UNKNOWN" + + # Queue the message line by line + for line in message.splitlines(): + # Form a message object that we will send to the consumers + m = { + "level" : level, + "message" : line, + } + + # Append the message to the buffer + self.buffer.append(m) + + # Send the message to all consumers + await asyncio.gather( + *(c.message(m) for c in self.consumers), + ) diff --git a/src/static/css/site.scss b/src/static/css/site.scss index 67cf3d1e..a723168f 100644 --- a/src/static/css/site.scss +++ b/src/static/css/site.scss @@ -33,3 +33,30 @@ $link: $primary; /* Custom CSS */ + +.jobs-log-stream { + // Use the code font + font-family: $family-code; + + // Keep any whitespace + white-space: pre; + + p.DEBUG { + background-color: $light; + color: $light-invert; + } + + p.INFO { + // Use the default text color + } + + p.WARNING { + background-color: $warning; + color: $warning-invert; + } + + p.ERROR { + background-color: $danger; + color: $danger-invert; + } +} diff --git a/src/static/js/job-log-stream.js b/src/static/js/job-log-stream.js new file mode 100644 index 00000000..f2fb830e --- /dev/null +++ b/src/static/js/job-log-stream.js @@ -0,0 +1,32 @@ +$(".jobs-log-stream").each(function() { + // Fetch the UUID of the job + const uuid = $(this).data("uuid"); + + // Find where we are appending lines to + const log = $(this); + + // Make the URL + const url = "wss://" + window.location.host + "/api/v1/jobs/" + uuid + "/log/stream"; + + // Try to connect to the stream + const stream = new WebSocket(url); + + stream.addEventListener("message", (event) => { + // Parse message as JSON + const data = JSON.parse(event.data); + + console.log("Message from server: ", data); + + // Create a new line + var line = $("

"); + + // Set the log level + line.addClass(data.level); + + // Set the content + line.text(data.message); + + // Append it to the log window + log.append(line); + }); +}); diff --git a/src/templates/jobs/log-stream.html b/src/templates/jobs/log-stream.html new file mode 100644 index 00000000..2f133ba3 --- /dev/null +++ b/src/templates/jobs/log-stream.html @@ -0,0 +1,24 @@ +{% extends "../base.html" %} + +{% block title %}{{ job }} - {{ _("Log") }}{% end block %} + +{% block container %} + + + {% module JobsLogStream(job) %} +{% end block %} diff --git a/src/templates/jobs/modules/list.html b/src/templates/jobs/modules/list.html index 623f460b..6e6fe429 100644 --- a/src/templates/jobs/modules/list.html +++ b/src/templates/jobs/modules/list.html @@ -79,7 +79,11 @@ {# Footer with some useful buttons #}