+++ /dev/null
-#!/usr/bin/python3
-##############################################################################
-# #
-# Pakfire - The IPFire package management system #
-# Copyright (C) 2021 Pakfire development team #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <http://www.gnu.org/licenses/>. #
-# #
-###############################################################################
-
-import asyncio
-import argparse
-import sys
-import tempfile
-
-import pakfire.config
-import pakfire.hub
-
-from pakfire.i18n import _
-
-class Cli(object):
- def __init__(self):
- # Read configuration
- self.config = pakfire.config.Config("client.conf")
-
- def parse_cli(self):
- parser = argparse.ArgumentParser(
- description = _("Pakfire Client command line interface"),
- )
- subparsers = parser.add_subparsers()
-
- # build
- build = subparsers.add_parser("build",
- help=_("Build a package remote"))
- build.add_argument("packages", nargs="+",
- help=_("Package(s) to build"))
- build.add_argument("-a", "--arch",
- help=_("Build the package(s) for the given architecture only"))
- build.add_argument("--repo",
- help=_("Create the build in this repository"))
- build.add_argument("--disable-test-builds", action="store_true",
- help=_("No test builds will be created for this build"))
- build.set_defaults(func=self._build)
-
- # check-connection
- check_connection = subparsers.add_parser("check-connection",
- help=_("Check the connection to the hub"))
- check_connection.set_defaults(func=self._check_connection)
-
- # repo
- repo = subparsers.add_parser("repo",
- help=_("Manage repositories"))
-
- repo_subparsers = repo.add_subparsers()
-
- # repo create
- repo_create = repo_subparsers.add_parser("create", help=_("Create a repository"))
- repo_create.add_argument("distro", help=_("Distribution"))
- repo_create.add_argument("name", help=_("The name of the repository"))
- repo_create.add_argument("--description", help=_("Description"))
- repo_create.set_defaults(func=self._repo_create)
-
- # repo delete
- repo_delete = repo_subparsers.add_parser("delete", help=_("Delete a repository"))
- repo_delete.add_argument("distro", help=_("Distribution"))
- repo_delete.add_argument("name", help=_("The name of the repository"))
- repo_delete.set_defaults(func=self._repo_delete)
-
- # repo list
- repo_list = repo_subparsers.add_parser("list", help=_("List all repositories"))
- repo_list.add_argument("distro", help=_("Distribution"))
- repo_list.set_defaults(func=self._repo_list)
-
- # repo show
- repo_show = repo_subparsers.add_parser("show", help=_("Show a repository"))
- repo_show.add_argument("distro", help=_("Distribution"))
- repo_show.add_argument("name", help=_("The name of the repository"))
- repo_show.set_defaults(func=self._repo_show)
-
- # upload
- upload = subparsers.add_parser("upload",
- help=_("Upload a file to the build service"))
-
- upload_subparsers = upload.add_subparsers()
-
- # upload list
- upload_list = upload_subparsers.add_parser("list", help=_("List all uploads"))
- upload_list.set_defaults(func=self._upload_list)
-
- # upload new
- upload_new = upload_subparsers.add_parser("new", help=_("Create a new upload"))
- upload_new.add_argument("file", nargs="+", help=_("Filename"))
- upload_new.set_defaults(func=self._upload_new)
-
- # upload delete
- upload_delete = upload_subparsers.add_parser("delete", help=_("Delete an upload"))
- upload_delete.add_argument("upload_id", metavar="ID", nargs="+",
- help=_("One or multiple IDs"))
- upload_delete.set_defaults(func=self._upload_delete)
-
- args = parser.parse_args()
-
- # Print usage if no action was given
- if not "func" in args:
- parser.print_usage()
- sys.exit(2)
-
- return args
-
- def __call__(self):
- # Parse command line arguments
- args = self.parse_cli()
-
- # Create connection to the hub
- hub = self.connect_to_hub()
-
- # Setup the callback
- callback = args.func(hub, args)
-
- # Call function
- try:
- ret = asyncio.run(callback)
-
- # Catch invalid inputs
- except ValueError as e:
- sys.stderr.write("%s\n" % e)
- ret = 2
-
- # Return with exit code
- sys.exit(ret or 0)
-
- def connect_to_hub(self):
- """
- Establishes a connection to the hub
- """
- huburl = self.config.get("client", "server", "https://pakfire.ipfire.org")
-
- # keytab
- keytab = self.config.get("client", "keytab")
-
- # Create a connection to the hub
- return pakfire.hub.Hub(huburl, keytab=keytab)
-
- async def _build(self, hub, ns):
- # Create a temporary directory if we need to call dist
- tmp = tempfile.TemporaryDirectory(prefix="pakfire-client-")
-
- try:
- # XXX implement calling dist
-
- # Build all packages
- for package in ns.packages:
- await hub.build(package, arches=ns.arch, repo=ns.repo,
- disable_test_builds=ns.disable_test_builds)
-
- finally:
- tmp.cleanup()
-
- async def _check_connection(self, hub, ns):
- response = await hub.test()
-
- # Print the response from the service
- try:
- print("\n".join(response["message"]))
- except KeyError:
- pass
-
- # Repositories
-
- def _print_repo(self, repo):
- print("%s:" % repo.get("name"))
-
- # Description
- description = repo.get("description")
- if description:
- print()
- for line in description.splitlines():
- print(" %s" % line)
- print()
-
- print(_("Created"), repo.get("created_at"))
- print()
-
- async def _repo_create(self, hub, ns):
- # Send request
- repo = await hub.create_repo(distro=ns.distro,
- name=ns.name, description=ns.description)
-
- # Print the result
- self._print_repo(repo)
-
- async def _repo_delete(self, hub, ns):
- # Send request
- await hub.delete_repo(distro=ns.distro, name=ns.name)
-
- async def _repo_list(self, hub, ns):
- # Send request
- repos = await hub.list_repos(distro=ns.distro)
-
- # Get repos
- repos = repos.get("repos", [])
-
- # End if nothing was found
- if not repos:
- return
-
- # Print all repositories
- for repo in sorted(repos, key=lambda r: r.get("name")):
- self._print_repo(repo)
-
- async def _repo_show(self, hub, ns):
- # Send request
- repo = await hub.get_repo(distro=ns.distro, name=ns.name)
-
- if repo:
- self._print_repo(repo)
-
- # Uploads
-
- async def _upload_list(self, hub, ns):
- uploads = await hub.list_uploads()
-
- for upload in uploads:
- print(_("Upload %s") % upload.get("id"))
-
- attributes = {
- _("Filename") : upload.get("filename"),
- _("Size") : upload.get("size"),
- _("Created at") : upload.get("created_at"),
- _("Expires at") : upload.get("expires_at"),
- }
-
- for key, val in attributes.items():
- print (" %-20s : %s" % (key, val))
-
- print() # newline
-
- async def _upload_new(self, hub, ns):
- for file in ns.file:
- upload_id = await hub.upload(file)
-
- # Tell the user
- print(_("%(file)s uploaded as %(id)s") % {
- "file" : file,
- "id" : upload_id,
- }
- )
-
- async def _upload_delete(self, hub, ns):
- """
- Delete uploads
- """
- for upload_id in ns.upload_id:
- await hub.delete_upload(upload_id)
-
- print(_("Deleted upload %s") % upload_id)
-
-
-if __name__ == "__main__":
- c = Cli()
- c()