###############################################################################
import asyncio
+import base64
import datetime
+import io
import json
import logging
+import mimetypes
import tornado.httpclient
import urllib.parse
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
+class BadRequestError(Exception):
+ pass
+
class Bugzilla(base.Object):
- def init(self):
+ def init(self, api_key=None):
+ if api_key is None:
+ api_key = self.settings.get("bugzilla-api-key")
+
+ # Store the API key
+ self.api_key = api_key
+
# Set up HTTP Client
self.client = tornado.httpclient.AsyncHTTPClient()
"""
return self.settings.get("bugzilla-url")
+ async def whoami(self):
+ """
+ Returns the email address of the logged in user
+ """
+ response = await self._request("GET", "/rest/whoami")
+
+ # Return the email address
+ return response.get("name")
+
async def get_products(self):
"""
Returns a dictionary with all products and versions
return products
+ @property
+ async def fields(self):
+ """
+ Fetch all fields
+ """
+ return await self._request("GET", "/rest/field/bug")
+
def make_url(self, *args, **kwargs):
"""
Composes a URL based on the base URL
return url
- async def _request(self, method, url, **kwargs):
+ async def _request(self, method, url, data=None):
# Headers
headers = {
# Authenticate all requests
- "X-BUGZILLA-API-KEY" : self.settings.get("bugzilla-api-key"),
+ "X-BUGZILLA-API-KEY" : self.api_key,
}
# Make the URL
url = self.make_url(url)
+ if data is None:
+ data = {}
+
+ # Fallback authentication because some API endpoints
+ # do not accept the API key in the header
+ data |= { "api_key" : self.api_key }
+
# Encode body
- body = urllib.parse.urlencode(kwargs)
+ body = urllib.parse.urlencode(data)
# For GET requests, append query arguments
if method == "GET":
)
# Send the request and wait for a response
- res = await self.client.fetch(req)
+ try:
+ res = await self.client.fetch(req)
+
+ # Catch any HTTP Errors
+ except tornado.httpclient.HTTPClientError as e:
+ try:
+ error = json.loads(e.response.body)
+ except json.DecodeError:
+ error = None
+
+ # Catch bad requests
+ if e.code == 400:
+ raise BadRequestError(error) from e
+
+ # Raise any other exceptions
+ raise e
log.debug("Response received in %.2fms" % (res.request_time * 1000))
"""
Creates a URL to a bug report
"""
- return self.make_url("/show_bug.cgi", id=id)
+ return self.make_url("/show_bug.cgi", data={ "id" : id })
def enter_url(self, component, product=None, version=None):
"""
}
# Send request
- response = await self._request("GET", "/rest/bug", **kwargs)
+ response = await self._request("GET", "/rest/bug", data=kwargs)
# Parse the response
- bugs = [Bug(self.backend, data) for data in response.get("bugs")]
+ bugs = [Bug(self.backend, self, data) for data in response.get("bugs")]
# Sort and return in reverse order
return sorted(bugs, reverse=True)
response = await self._request("GET", "/rest/bug/%s" % bug)
for data in response.get("bugs"):
- return Bug(self.backend, data)
+ return Bug(self.backend, self, data)
+
+ async def create_bug(self, product, component, version, summary, **kwargs):
+ data = {
+ # Mandatory fields
+ "product" : product,
+ "component" : component,
+ "version" : version,
+ "summary" : summary,
+ } | kwargs
+
+ # Create the bug
+ response = await self._request("POST", "/rest/bug", data=data)
+
+ # Fetch the bug ID
+ id = response.get("id")
+ if not id:
+ raise RuntimeError("No bug ID received")
+
+ # Return the Bug object
+ return await self.get_bug(id)
class Bug(base.Object):
- def init(self, data):
+ def init(self, bugzilla, data):
+ self.bugzilla = bugzilla
self.data = data
def __repr__(self):
@property
def url(self):
- return self.backend.bugzilla.make_url("/show_bug.cgi", id=self.id)
+ return self.bugzilla.make_url("/show_bug.cgi", id=self.id)
@property
def created_at(self):
@property
def keywords(self):
return self.data.get("keywords")
+
+ async def attach(self, filename, data, summary, content_type=None, is_patch=False):
+ """
+ Attaches something to an existing bug
+ """
+ # Guess the mimetype if none was provided
+ if content_type is None:
+ content_type, encoding = mimetypes.guess_type(filename)
+
+ # Assemble the request
+ data = {
+ # Mandatory fields
+ "ids" : self.id,
+ "summary" : summary,
+ "file_name" : filename,
+ "data" : self._encode_to_base64(data),
+ "content_type" : content_type,
+ }
+
+ # Is this a patch?
+ if is_patch:
+ data |= { "is_patch" : is_patch }
+
+ # Send request
+ await self.bugzilla._request("POST", "/rest/bug/%s/attachment" % self.id, data=data)
+
+ @staticmethod
+ def _encode_to_base64(data):
+ """
+ This function takes a file-like object, a bytes-like object or string
+ and returns it encoded as base64.
+ """
+ # Read all data from file handles
+ if isinstance(data, io.IOBase):
+ data = data.read()
+
+ # Convert to bytes()
+ if not isinstance(data, bytes):
+ data = data.encode()
+
+ # Encode to base64
+ data = base64.b64encode(data)
+
+ # Convert to string
+ return data.decode()
self.redirect("/builds/%s" % build.uuid)
+class BugHandler(base.BaseHandler):
+ @tornado.web.authenticated
+ async def get(self, uuid):
+ build = self.backend.builds.get_by_uuid(uuid)
+ if not build:
+ raise tornado.web.HTTPError(404, "Could not find build %s" % uuid)
+
+ # Fetch fields
+ fields = await self.backend.bugzilla.fields
+
+ self.render("builds/bug.html", build=build, fields=fields)
+
+ @tornado.web.authenticated
+ async def post(self, uuid):
+ build = self.backend.builds.get_by_uuid(uuid)
+ if not build:
+ raise tornado.web.HTTPError(404, "Could not find build %s" % uuid)
+
+ # Is the user connected to Bugzilla?
+ if not self.current_user.bugzilla:
+ raise tornado.web.HTTPError(400, "%s is not connected to Bugzilla" \
+ % self.current_user)
+
+ kwargs = {
+ # Summary & Description
+ "summary" : self.get_argument("summary"),
+ "description" : self.get_argument("description", None),
+ } | build.bugzilla_fields
+
+ # Create the bug
+ bug = await self.current_user.bugzilla.create_bug(**kwargs)
+
+ # Send the attachments
+ for job in build.jobs:
+ if not self.get_argument_bool("attach_log_%s" % job.uuid):
+ continue
+
+ # Open the logfile
+ try:
+ log = await job.open_log()
+ except FileNotFoundError as e:
+ log.warning("Could not open log file for %s" % job)
+ continue
+
+ # Attach it to the bug
+ await bug.attach(summary="Log file for %s" % job, filename="%s.log" % job,
+ data=log, content_type="text/plain")
+
+ self.render("builds/bug-created.html", build=build, bug=bug)
+
+
class GroupShowHandler(base.BaseHandler):
def get(self, uuid):
group = self.backend.builds.groups.get_by_uuid(uuid)