# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+# pylint: disable=broad-exception-caught
+
"""Hammer - Kea development environment management tool."""
from __future__ import print_function
'3.19': True,
'3.20': True,
},
- 'arch': {}
+ 'arch': {},
}
-# pylint: disable=C0326
IMAGE_TEMPLATES = {
# fedora
'fedora-27-lxc': {'bare': 'lxc-fedora-27', 'kea': 'godfryd/kea-fedora-27'},
system = platform.system()
if system == 'Linux':
system, revision = None, None
- try:
- system, revision, _ = platform.dist() # pylint: disable=deprecated-method
- if system == 'debian':
- revision = revision.split('.')[0]
- elif system == 'redhat':
- system = 'rhel'
- revision = revision[0]
- elif system == 'rocky':
- revision = revision[0]
- elif system == 'centos':
- revision = revision[0]
- if not system or not revision:
- raise Exception('fallback to /etc/os-release')
- except Exception:
- if os.path.exists('/etc/os-release'):
- vals = {}
- with open('/etc/os-release') as f:
- for line in f.readlines():
- if '=' in line:
- key, val = line.split('=', 1)
- vals[key.strip()] = val.strip().replace('"', '')
-
- for i in ['ID', 'ID_LIKE']:
- if i in vals:
- system_candidates = vals[i].strip('"').split()
- for system_candidate in system_candidates:
- if system_candidate in SYSTEMS:
- system = system_candidate
- break
- else:
- continue
+ if not os.path.exists('/etc/os-release'):
+ raise UnexpectedError('/etc/os-release does not exist. Cannot determine system or its revision.')
+ vals = {}
+ with open('/etc/os-release', encoding='utf-8') as f:
+ for line in f.readlines():
+ if '=' in line:
+ key, val = line.split('=', 1)
+ vals[key.strip()] = val.strip().replace('"', '')
+
+ for i in ['ID', 'ID_LIKE']:
+ if i in vals:
+ system_candidates = vals[i].strip('"').split()
+ for system_candidate in system_candidates:
+ if system_candidate in SYSTEMS:
+ system = system_candidate
break
- if system is None:
- raise Exception('cannot determine system')
+ else:
+ continue
+ break
+ if system is None:
+ raise UnexpectedError('cannot determine system')
- for i in ['VERSION_ID', 'BUILD_ID']:
- if i in vals:
- revision = vals[i]
- break
- if revision is None:
- raise Exception('cannot determine revision')
+ for i in ['VERSION_ID', 'BUILD_ID']:
+ if i in vals:
+ revision = vals[i]
+ break
+ if revision is None:
+ raise UnexpectedError('cannot determine revision')
- if system in ['alpine', 'rhel', 'rocky']:
- revision = revision.rsplit('.', 1)[0]
- else:
- raise Exception('cannot determine system or its revision')
+ if system in ['alpine', 'rhel', 'rocky']:
+ revision = revision.rsplit('.', 1)[0]
elif system == 'FreeBSD':
system = system.lower()
revision = platform.release()
class ExecutionError(Exception):
"""Exception thrown when execution encountered an error."""
- pass
+
+
+class UnexpectedError(Exception):
+ """Exception thrown when an unexpected error occurred that hammer does not know how to recover from."""
def execute(cmd, timeout=60, cwd=None, env=None, raise_error=True, dry_run=False, log_file_path=None,
cmd = cmd.replace('sudo', 'sudo -E')
if log_file_path:
- log_file = open(log_file_path, "wb")
+ with open(log_file_path, "wb", encoding='utf-8') as file:
+ log_file = file.read()
for attempt in range(attempts):
if interactive:
# Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified,
# security issue.
- p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True) # nosec B602
- exitcode = p.wait()
+ with subprocess.Popen(cmd, cwd=cwd, env=env, shell=True) as pipe: # nosec: B602
+ pipe.communicate()
+ exitcode = pipe.returncode
else:
# Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified,
# security issue.
- p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True, # nosec B602
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
-
- if capture:
- output = ''
- t0 = time.time()
- # Repeat until the process has stopped and there are no more lines
- # to read, or until the timeout is up.
- while True:
- line = p.stdout.readline()
- if line:
- line_decoded = line.decode(encoding='ascii', errors='ignore').rstrip() + '\r'
- if not quiet:
- log.info(line_decoded)
- if capture:
- output += line_decoded
- if log_file_path:
- log_file.write(line)
- t1 = time.time()
- if p.poll() is not None and not line or (timeout is not None and timeout < t1 - t0):
- break
-
- # If no exitcode yet, ie. process is still running then it means that timeout occurred.
- # In such case terminate the process and raise an exception.
- if p.poll() is None:
- # kill using sudo to be able to kill other sudo commands
- execute('sudo kill -s TERM %s' % p.pid)
- time.sleep(5)
- # if still running, kill harder
- if p.poll() is None:
- execute('sudo kill -s KILL %s' % p.pid)
- msg = "Execution timeout, %d > %d seconds elapsed (start: %d, stop %d), cmd: '%s'"
- msg = msg % (t1 - t0, timeout, t0, t1, cmd)
- raise ExecutionError(msg)
- exitcode = p.returncode
+ with subprocess.Popen(cmd, cwd=cwd, env=env, shell=True, # nosec: B602
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT) as pipe:
+ if timeout is not None:
+ pipe.wait(timeout)
+ try:
+ stdout, _ = pipe.communicate()
+ except subprocess.TimeoutExpired as e:
+ pipe.kill()
+ stdout2, _ = pipe.communicate()
+ stdout += stdout2
+ raise ExecutionError(f'Execution timeout: {e}, cmd: {cmd}') from e
+ exitcode = pipe.returncode
+ if capture:
+ output = stdout.decode('utf-8')
+ if not quiet:
+ print(stdout.decode('utf-8'))
+ if log_file_path is not None:
+ log_file.write(stdout)
if exitcode == 0:
break
- elif attempt < attempts - 1:
+
+ if attempt < attempts - 1:
txt = 'command failed, retry, attempt %d/%d' % (attempt, attempts)
if log_file_path:
txt_to_file = '\n\n[HAMMER] %s\n\n\n' % txt
continue
status, name, version, arch, descr = m.groups()
name = name.split(':')[0]
- pkg_cache[name] = dict(status=status, version=version, arch=arch, descr=descr)
+ pkg_cache[name] = {
+ 'status': status,
+ 'version': version,
+ 'arch': arch,
+ 'descr': descr,
+ }
return pkg_cache
for line in out.splitlines():
name = line.strip()
- pkg_cache[name] = dict(status='ii')
+ pkg_cache[name] = {'status': 'ii'}
return pkg_cache
for line in out.splitlines():
name = line.strip()
- pkg_cache[name] = dict(status='ii')
+ pkg_cache[name] = {'status': 'ii'}
return pkg_cache
return IMAGE_TEMPLATES[key][variant]
-def _get_full_repo_url(repository_url, system, revision, pkg_version):
+def _get_full_repo_url(repository_url, system, revision):
if not repository_url:
return None
repo_name = 'kea-%s-%s' % (system, revision)
return repo_url
-class VagrantEnv(object):
+class VagrantEnv():
"""Helper class that makes interacting with Vagrant easier.
It creates Vagrantfile according to specified system. It exposes basic Vagrant functions
self.python = None
self.key = key = "%s-%s-%s" % (system, revision, provider)
- self.image_tpl = image_tpl = get_image_template(key, image_template_variant)
+ self.image_tpl = get_image_template(key, image_template_variant)
self.repo_dir = os.getcwd()
sys_dir = "%s-%s" % (system, revision)
box_version=box_version,
hostname=hostname)
- with open(vagrantfile_path, "w") as f:
+ with open(vagrantfile_path, "w", encoding='utf-8') as f:
f.write(vagrantfile)
log.info('Prepared vagrant system %s in %s', self.name, self.vagrant_dir)
with urllib.request.urlopen(url) as response: # nosec B310
data = response.read()
except Exception as e:
- log.exception(f'ignored exception: {e}')
+ log.exception('ignored exception: %s', e)
return {}
data = json.loads(data)
return data
meta_file = os.path.join(self.vagrant_dir, '.vagrant/machines/default', self.provider, 'box_meta')
if not os.path.exists(meta_file):
return {}
- with open(meta_file) as f:
+ with open(meta_file, encoding='utf-8') as f:
data = f.read()
data = json.loads(data)
return data
_, out = execute("vagrant status", cwd=self.vagrant_dir, timeout=15, capture=True, quiet=True)
m = re.search(r'default\s+(.+)\(', out)
if not m:
- raise Exception('cannot get status in:\n%s' % out)
+ raise UnexpectedError('cannot get status in:\n%s' % out)
return m.group(1).strip()
def bring_up_latest_box(self):
# correct files ownership
execute('sudo chown `id -un`:`id -gn` *', cwd=lxc_box_dir)
# and other metadata
- with open(os.path.join(lxc_box_dir, 'metadata.json'), 'w') as f:
+ with open(os.path.join(lxc_box_dir, 'metadata.json'), 'w', encoding='utf-8') as f:
now = datetime.datetime.now()
f.write('{\n')
f.write(' "provider": "lxc",\n')
execute('scp -F %s -r default:~/kea-pkg/* .' % ssh_cfg_path, cwd=pkgs_dir)
if upload:
- repo_url = _get_full_repo_url(repository_url, self.system, self.revision, pkg_version)
+ repo_url = _get_full_repo_url(repository_url, self.system, self.revision)
if repo_url is None:
raise ValueError('repo_url is None')
upload_cmd = 'curl -v --netrc -f'
execute(cmd, cwd=self.vagrant_dir)
results_file = os.path.join(self.vagrant_dir, 'unit-test-results.json')
if os.path.exists(results_file):
- with open(results_file) as f:
+ with open(results_file, encoding='utf-8') as f:
txt = f.read()
results = json.loads(txt)
total = results['grand_total']
cmd = 'scp -F %s -r default:/home/vagrant/aggregated_tests.xml .' % ssh_cfg_path
execute(cmd, cwd=self.vagrant_dir)
except Exception as e:
- log.exception(f'ignored issue with parsing unit test results: {e}')
+ log.exception('ignored issue with parsing unit test results: %s', e)
return total, passed
execute('vagrant ssh-config > %s' % ssh_cfg_path, cwd=self.vagrant_dir)
return ssh_cfg_path
- def execute(self, cmd, timeout=None, raise_error=True, log_file_path=None, quiet=False, env=None,
+ def execute(self, cmd, timeout=None, raise_error=True, log_file_path=None, quiet=False, env=None, capture=False,
attempts=1, sleep_time_after_attempt=None):
"""Execute provided command inside Vagrant system."""
if not env:
return execute('vagrant ssh -c "%s"' % cmd, env=env, cwd=self.vagrant_dir, timeout=timeout,
raise_error=raise_error, dry_run=self.dry_run, log_file_path=log_file_path,
- quiet=quiet, check_times=self.check_times,
+ quiet=quiet, check_times=self.check_times, capture=capture,
attempts=attempts, sleep_time_after_attempt=sleep_time_after_attempt)
def prepare_system(self):
exitcode = self.execute(cmd, raise_error=False)
if exitcode != 0:
env = os.environ.copy()
- with open(os.path.expanduser('~/rhel-creds.txt')) as f:
+ with open(os.path.expanduser('~/rhel-creds.txt'), encoding='utf-8') as f:
env['RHEL_USER'] = f.readline().strip()
env['RHEL_PASSWD'] = f.readline().strip()
self.execute('sudo subscription-manager register --user $RHEL_USER --password "$RHEL_PASSWD"', env=env)
gtest_version = '1.14.0'
gtest_path = f'/usr/src/googletest-release-{gtest_version}/googletest'
if os.path.exists(gtest_path):
- log.info(f'gtest is already installed in {gtest_path}.')
+ log.info('gtest is already installed in %s.', gtest_path)
return
execute('mkdir -p ~/.hammer-tmp')
libyang_header = f'{prefix}/include/libyang/version.h'
if (any(os.path.exists(i) for i in libyang_so_candidates) and os.path.exists(libyang_header) and
execute(f"grep -F '#define LY_VERSION_MAJOR 2' '{libyang_header}'", raise_error=False) == 0):
- log.info(f'libyang is already installed at {libyang_header}.')
+ log.info('libyang is already installed at %s.', libyang_header)
return
version = 'v2.1.4'
cwd='~/.hammer-tmp/libyang/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/libyang/build')
execute('sudo make install', cwd='~/.hammer-tmp/libyang/build')
- system, revision = get_system_revision()
+ system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
sysrepo_header = f'{prefix}/include/sysrepo/version.h'
if (any(os.path.exists(i) for i in sysrepo_so_candidates) and os.path.exists(sysrepo_header) and
execute(f"grep -F '#define SR_VERSION_MAJOR 7' '{sysrepo_header}'", raise_error=False) == 0):
- log.info(f'sysrepo is already installed at {sysrepo_header}.')
+ log.info('sysrepo is already installed at %s.', sysrepo_header)
return
version = 'v2.2.12'
execute('cmake -DBUILD_TESTING=OFF -DREPO_PATH=/etc/sysrepo ..', cwd='~/.hammer-tmp/sysrepo/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/sysrepo/build')
execute('sudo make install', cwd='~/.hammer-tmp/sysrepo/build')
- system, revision = get_system_revision()
+ system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
libyang_cpp_pc = f'{prefix_lib}/pkgconfig/libyang-cpp.pc'
if (os.path.exists(libyang_cpp_so) and os.path.exists(libyang_cpp_pc) and
execute(f"grep -F 'Version: 1.1.0' '{libyang_cpp_pc}'", raise_error=False) == 0):
- log.info(f'libyang-cpp is already installed at {libyang_cpp_so}.')
+ log.info('libyang-cpp is already installed at %s.', libyang_cpp_so)
return
version = 'ae7d649ea75da081725c119dd553b2ef3121a6f8'
execute('git clone https://github.com/CESNET/libyang-cpp.git ~/.hammer-tmp/libyang-cpp')
execute(f'git checkout {version}', cwd='~/.hammer-tmp/libyang-cpp')
# New cpp compiler is more picky about missing headers. (ex. Fedora 40)
- return_code = execute('sudo grep "#include <algorithm>" ~/.hammer-tmp/libyang-cpp/src/Context.cpp',
- raise_error=False)
- if return_code == 1:
- execute(r'sed -i "/#include <libyang\/libyang.h>/a #include <algorithm>" '
- '~/.hammer-tmp/libyang-cpp/src/Context.cpp')
+ execute("""git apply <<EOF
+diff --git a/src/Context.cpp b/src/Context.cpp
+index b2fe887..add11cc 100644
+--- a/src/Context.cpp
++++ b/src/Context.cpp
+@@ -13,2 +13,3 @@
+ #include <libyang/libyang.h>
++#include <algorithm>
+ #include <span>
+EOF
+""", cwd='~/.hammer-tmp/libyang-cpp')
execute('mkdir ~/.hammer-tmp/libyang-cpp/build')
execute('cmake -DBUILD_TESTING=OFF .. ', cwd='~/.hammer-tmp/libyang-cpp/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/libyang-cpp/build')
execute('sudo make install', cwd='~/.hammer-tmp/libyang-cpp/build')
- system, revision = get_system_revision()
+ system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
sysrepo_cpp_pc = f'{prefix_lib}/pkgconfig/sysrepo-cpp.pc'
if (os.path.exists(sysrepo_cpp_so) and os.path.exists(sysrepo_cpp_pc) and
execute(f"grep -F 'Version: 1.1.0' '{sysrepo_cpp_pc}'", raise_error=False) == 0):
- log.info(f'sysrepo-cpp is already installed at {sysrepo_cpp_so}.')
+ log.info('sysrepo-cpp is already installed at %s.', sysrepo_cpp_so)
return
version = '02634174ffc60568301c3d9b9b7cf710cff6a586'
execute('cmake -DBUILD_TESTING=OFF .. ', cwd='~/.hammer-tmp/sysrepo-cpp/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/sysrepo-cpp/build')
execute('sudo make install', cwd='~/.hammer-tmp/sysrepo-cpp/build')
- system, revision = get_system_revision()
+ system, _ = get_system_revision()
if system != 'alpine':
execute('sudo ldconfig')
except Exception as e:
exit_code = execute('openssl rsa -in src/lib/asiolink/testutils/ca/kea-server.key '
'-out src/lib/asiolink/testutils/ca/kea-server.key', raise_error=False)
if exit_code != 0:
- log.warning(f'openssl command failed with exit code {exit_code}, but continuing...')
+ log.warning('openssl command failed with exit code %d, but continuing...', exit_code)
for file in [
'./src/lib/asiolink/testutils/ca/kea-ca.crt',
'./src/lib/asiolink/testutils/ca/kea-client.crt',
execute(cmd)
if system == 'debian' and revision == '9':
- log.info('FIX FOR ISSUE kea#389: {} {}'.format(system, revision))
+ log.info('FIX FOR ISSUE kea#389: %s %s', system, revision)
cmd = "sh -c \"cat <<EOF | sudo mysql -u root\n"
cmd += "use keatest;\n"
cmd += "set global innodb_large_prefix=on;\n"
execute(cmd)
-def _enable_postgresql(system, revision):
+def _enable_postgresql(system):
if system == 'alpine':
execute('sudo rc-update add postgresql')
elif system == 'freebsd':
execute('sudo systemctl enable postgresql.service')
-def _restart_postgresql(system, revision):
+def _restart_postgresql(system):
if system == 'freebsd':
# redirecting output from start script to /dev/null otherwise the postgresql rc.d script will hang
# calling restart instead of start allow hammer.py to pass even if postgresql is already installed
exit_code = execute('sudo systemctl restart postgresql.service', raise_error=False)
if exit_code != 0:
log.error('Command "sudo systemctl restart postgresql.service" failed. Here is the journal:')
- _, output = execute('sudo journalctl -xu postgresql.service', raise_error=False)
+ execute('sudo journalctl -xu postgresql.service', raise_error=False)
log.error('And here are the logs:')
_, output = execute("sudo -u postgres psql -A -t -c 'SELECT pg_current_logfile()'",
capture=True, quiet=True)
logfile = os.path.basename(output.strip())
- _, output = execute(fr'sudo find /var -type f -name "{logfile}" -exec cat {{}} \;', raise_error=False)
+ execute(fr'sudo find /var -type f -name "{logfile}" -exec cat {{}} \;', raise_error=False)
sys.exit(exit_code)
# and user both set to 'all'. This is to not affect authentication of
# `postgres` user which should have a separate entry.
def _change_postgresql_auth_method(connection_type, auth_method, hba_file):
- execute(r"sudo sed -i.bak 's/^{}\(.*\)all\(.*\)all\(.*\) [a-z0-9]*$/{}\1all\2all\3 {}/g' '{}'".format(
- connection_type, connection_type, auth_method, hba_file), cwd='/tmp')
+ execute(fr"sudo sed -i.bak 's/^{connection_type}\(.*\)all\(.*\)all\(.*\) [a-z0-9]*$"
+ fr"/{connection_type}\1all\2all\3 {auth_method}/g' '{hba_file}'",
+ cwd='/tmp')
-def _configure_pgsql(system, revision, features):
+def _configure_pgsql(system, features):
""" Configure PostgreSQL DB """
if system == 'freebsd':
# the initial start of the postgresql will create the 'postmaster.opts' file
execute(f'sudo test ! -f {var_db_postgres_data}/postmaster.opts && sudo service postgresql onestart || true')
- _enable_postgresql(system, revision)
- _restart_postgresql(system, revision)
+ _enable_postgresql(system)
+ _restart_postgresql(system)
# Change auth-method to 'md5' on all connections.
cmd = "sudo -u postgres psql -t -c 'SHOW hba_file' | xargs"
{}
' '{}'""".format(auth_header, postgres_auth_line, hba_file))
- _restart_postgresql(system, revision)
+ _restart_postgresql(system)
cmd = """sh -c \"cat <<EOF | sudo -u postgres psql postgres
DROP DATABASE IF EXISTS keatest;
Returns the version available in the package manager's repository for the requested package.
:param package: the name of the package whose version is retrieved
"""
- system, revision = get_system_revision()
+ system, _ = get_system_revision()
if system == 'alpine':
cmd = "apk search --exact {0} | sed 's/{0}-//g'"
elif system in ['debian', 'ubuntu']:
if version < minimum:
message = f"ERROR: {package} has version {version}, but must be >= {minimum}"
log.error(message)
- raise Exception(message)
+ raise UnexpectedError(message)
def prepare_system_local(features, check_times, ignore_errors_for, just_configure):
"""Prepare local system for Kea development based on requested features."""
system, revision = get_system_revision()
- log.info(f'Preparing deps for {system} {revision}...')
+ log.info('Preparing deps for %s %s...', system, revision)
if not just_configure:
install_packages_local(system, revision, features, check_times, ignore_errors_for)
_configure_mysql(system, revision, features)
if 'pgsql' in features:
- _configure_pgsql(system, revision, features)
+ _configure_pgsql(system, features)
log.info('Preparing deps completed successfully.')
failures = int(root.get('failures'))
disabled = int(root.get('disabled'))
errors = int(root.get('errors'))
- results[fn] = dict(total=total, failures=failures, disabled=disabled, errors=errors)
+ results[fn] = {
+ 'total': total,
+ 'failures': failures,
+ 'disabled': disabled,
+ 'errors': errors,
+ }
grand_total += total
grand_not_passed += failures + errors
result = green(result)
log.info('Unit test results: %s', result)
- with open('unit-test-results.json', 'w') as f:
+ with open('unit-test-results.json', 'w', encoding='utf-8') as f:
f.write(json.dumps(results))
# store aggregated results in XML
def _build_rpm(system, revision, features, tarball_path, env, check_times, dry_run,
- pkg_version, pkg_isc_version, repo_url):
+ pkg_version, pkg_isc_version):
# unpack kea sources tarball
_, arch = execute('arch', capture=True)
def _build_deb(system, revision, features, tarball_path, env, check_times, dry_run,
- pkg_version, pkg_isc_version, repository_url, repo_url):
+ pkg_version, pkg_isc_version, repo_url):
_, arch = execute('arch', capture=True)
if system == 'debian' and revision == '9':
_, output = execute("curl -o /dev/null -s -w '%{{http_code}}' {}/dists/kea/Release 2>/dev/null".format(repo_url),
capture=True)
http_code = output.rstrip()
- release_file_exists = (http_code == '200')
+ release_file_exists = http_code == '200'
if release_file_exists:
- log.info(f'{repo_url}/dists/kea/Release exists.')
+ log.info('%s/dists/kea/Release exists.', repo_url)
else:
repo_name = 'kea-%s-%s-%s' % (pkg_version.rsplit('.', 1)[0], system, revision)
- log.error(f'{repo_url}/dists/kea/Release does not exist. '
- f'This is usually caused by no package existing in {repo_name}. '
+ log.error('%s/dists/kea/Release does not exist. '
+ 'This is usually caused by no package existing in %s. '
'You can solve this by uploading any package.'
- 'Continuing, but the build will likely fail.')
+ 'Continuing, but the build will likely fail.', repo_url, repo_name)
# try apt update for up to 10 times if there is an error
for _ in range(10):
_check_installed_rpm_or_debs(services_list)
-def _build_alpine_apk(system, revision, features, tarball_path, env, check_times, dry_run,
- pkg_version, pkg_isc_version, repo_url):
+def _build_alpine_apk(revision, features, tarball_path, check_times, dry_run,
+ pkg_version, pkg_isc_version):
_, arch = execute('arch', capture=True)
# unpack tarball
execute('sudo rm -rf kea-src packages', check_times=check_times, dry_run=dry_run)
# enable ccache if requested
env = _prepare_ccache_if_needed(system, ccache_dir, env)
- repo_url = _get_full_repo_url(repository_url, system, revision, pkg_version)
+ repo_url = _get_full_repo_url(repository_url, system, revision)
if repo_url is None:
raise ValueError('repo_url is None')
if system in ['fedora', 'centos', 'rhel', 'rocky']:
_build_rpm(system, revision, features, tarball_path, env, check_times, dry_run,
- pkg_version, pkg_isc_version, repo_url)
+ pkg_version, pkg_isc_version)
elif system in ['ubuntu', 'debian']:
_build_deb(system, revision, features, tarball_path, env, check_times, dry_run,
- pkg_version, pkg_isc_version, repository_url, repo_url)
+ pkg_version, pkg_isc_version, repo_url)
elif system in ['alpine']:
- _build_alpine_apk(system, revision, features, tarball_path, env, check_times, dry_run,
- pkg_version, pkg_isc_version, repo_url)
+ _build_alpine_apk(revision, features, tarball_path, check_times, dry_run,
+ pkg_version, pkg_isc_version)
elif system in ['arch']:
pass
except ExecutionError as e:
error = e
msg = ' - ' + red(str(e))
- except Exception as e: # pylint: disable=broad-except
+ except Exception as e:
log.exception('Building erred')
error = e
msg = ' - ' + red(str(e))
def list_supported_systems():
"""List systems hammer can support (with supported providers)."""
- for system in SYSTEMS:
- print('%s:' % system)
- for release, supported in SYSTEMS[system].items():
+ for system, revision in SYSTEMS.items():
+ print(f'{system}:')
+ for release, supported in revision.items():
if not supported:
continue
providers = []
if k in IMAGE_TEMPLATES:
providers.append(p)
providers = ', '.join(providers)
- print(' - %s: %s' % (release, providers))
+ print(f' - {release}: {providers}')
def list_created_systems():
for i in args.with_randomly:
if _coin_toss():
features.add(i)
- log.info(f'Feature enabled through coin toss: {i}')
+ log.info('Feature enabled through coin toss: %s', i)
else:
features.discard(i)
- log.info(f'Feature disabled through coin toss: {i}')
+ log.info('Feature disabled through coin toss: %s', i)
if hasattr(args, 'ccache_dir') and args.ccache_dir:
features.add('ccache')
def _check_system_revision(system, revision):
if revision == 'all':
return
- if system not in SYSTEMS.keys():
+ if system not in SYSTEMS:
msg = "hammer.py error: argument -s/--system: invalid choice: '%s' (choose from '%s')"
msg = msg % (revision, "', '".join(SYSTEMS.keys()))
log.error(msg)
sys.exit(1)
- revs = SYSTEMS[system].keys()
- if revision not in revs:
+ if revision not in SYSTEMS[system]:
msg = "hammer.py error: argument -r/--revision: invalid choice: '%s' (choose from '%s')"
- msg = msg % (revision, "', '".join(revs))
+ msg = msg % (revision, "', '".join(SYSTEMS[system].keys()))
log.error(msg)
sys.exit(1)
if not SYSTEMS[system][revision]:
- log.warning(f'{system} ${revision} is no longer officially supported. '
- 'The script will continue in a best-effort manner.')
+ log.warning('%s %s is no longer officially supported. '
+ 'The script will continue in a best-effort manner.', system, revision)
def _prepare_ccache_dir(ccache_dir, system, revision):
def upload_to_repo(args, pkgs_dir):
# NOTE: note the differences (if any) in system/revision vs args.system/revision
system, revision = get_system_revision()
- repo_url = _get_full_repo_url(args.repository_url, system, revision, args.pkg_version)
+ repo_url = _get_full_repo_url(args.repository_url, system, revision)
if repo_url is None:
raise ValueError('repo_url is None')
upload_cmd = 'curl -v --netrc -f'
log.info("Asset already exists in the repository. Skipping upload.")
break
elif exitcode != 0:
- raise Exception('Upload failed: %s' % output)
+ raise UnexpectedError('Upload failed: %s' % output)
else:
break
examples.
"""
+import argparse
+import base64
import configparser
import re
-import time
import socket
import sys
-import base64
+import time
from datetime import datetime
-from optparse import OptionParser
re_hex = re.compile(r'^0x[0-9a-fA-F]+')
re_decimal = re.compile(r'^\d+$')
dict_qr = {'query': 0, 'response': 1}
dict_opcode = {'query': 0, 'iquery': 1, 'status': 2, 'notify': 4,
'update': 5}
-rdict_opcode = dict([(dict_opcode[k], k.upper()) for k in dict_opcode.keys()])
dict_rcode = {'noerror': 0, 'formerr': 1, 'servfail': 2, 'nxdomain': 3,
'notimp': 4, 'refused': 5, 'yxdomain': 6, 'yxrrset': 7,
'nxrrset': 8, 'notauth': 9, 'notzone': 10}
-rdict_rcode = dict([(dict_rcode[k], k.upper()) for k in dict_rcode.keys()])
dict_rrtype = {'none': 0, 'a': 1, 'ns': 2, 'md': 3, 'mf': 4, 'cname': 5,
'soa': 6, 'mb': 7, 'mg': 8, 'mr': 9, 'null': 10,
'wks': 11, 'ptr': 12, 'hinfo': 13, 'minfo': 14, 'mx': 15,
'spf': 99, 'unspec': 103, 'tkey': 249, 'tsig': 250,
'dlv': 32769, 'ixfr': 251, 'axfr': 252, 'mailb': 253,
'maila': 254, 'any': 255, 'caa': 257}
-rdict_rrtype = dict([(dict_rrtype[k], k.upper()) for k in dict_rrtype.keys()])
dict_rrclass = {'in': 1, 'ch': 3, 'hs': 4, 'any': 255}
-rdict_rrclass = dict([(dict_rrclass[k], k.upper()) for k in dict_rrclass.keys()])
-dict_algorithm = {'rsamd5': 1, 'dh': 2, 'dsa': 3, 'ecc': 4,
- 'rsasha1': 5}
+dict_algorithm = {'rsamd5': 1, 'dh': 2, 'dsa': 3, 'ecc': 4, 'rsasha1': 5}
dict_nsec3_algorithm = {'reserved': 0, 'sha1': 1}
-rdict_algorithm = dict([(dict_algorithm[k], k.upper()) for k in dict_algorithm.keys()])
-rdict_nsec3_algorithm = dict([(dict_nsec3_algorithm[k], k.upper()) for k in dict_nsec3_algorithm.keys()])
+
+rdict_opcode = {k.upper(): v for k, v in dict_opcode.items()}
+rdict_rcode = {k.upper(): v for k, v in dict_rcode.items()}
+rdict_rrtype = {k.upper(): v for k, v in dict_rrtype.items()}
+rdict_rrclass = {k.upper(): v for k, v in dict_rrclass.items()}
+rdict_algorithm = {k.upper(): v for k, v in dict_algorithm.items()}
+rdict_nsec3_algorithm = {k.upper(): v for k, v in dict_nsec3_algorithm.items()}
header_xtables = {'qr': dict_qr, 'opcode': dict_opcode,
'rcode': dict_rcode}
question_xtables = {'rrtype': dict_rrtype, 'rrclass': dict_rrclass}
-def parse_value(value, xtable={}):
+def parse_value(value, xtable=None):
+ if xtable is None:
+ xtable = {}
if re.search(re_hex, value):
return int(value, 16)
if re.search(re_decimal, value):
return value
-def code_totext(code, dict):
- if code in dict.keys():
- return dict[code] + '(' + str(code) + ')'
+def code_totext(code, dictionary):
+ if code in dictionary:
+ return dictionary[code] + '(' + str(code) + ')'
return str(code)
for label in labels:
if len(label) > 4 and label[0:4] == 'ptr=':
# special meta-syntax for compression pointer
- wire += '%04x' % (0xc000 | int(l[4:]))
+ wire += '%04x' % (0xc000 | int(label[4:]))
break
if absolute or len(label) > 0:
wire += '%02x' % len(label)
return wire
-def encode_string(name, len=None):
- if type(name) is int and len is not None:
- return '%0.*x' % (len * 2, name)
+def encode_string(name, length=None):
+ if isinstance(name, int) and length is not None:
+ return '%0.*x' % (length * 2, name)
return ''.join(['%02x' % ord(ch) for ch in name])
-def encode_bytes(name, len=None):
- if type(name) is int and len is not None:
- return '%0.*x' % (len * 2, name)
+def encode_bytes(name, length=None):
+ if isinstance(name, int) and length is not None:
+ return '%0.*x' % (length * 2, name)
return ''.join(['%02x' % ch for ch in name])
return len(name.split('.'))
-def get_config(config, section, configobj, xtables={}):
+def get_config(config, section, configobj, xtables=None):
+ if xtables is None:
+ xtables = {}
try:
for field in config.options(section):
value = config.get(section, field)
- if field in xtables.keys():
+ if field in xtables:
xtable = xtables[field]
else:
xtable = {}
self.dump_header(f, self.rdlen)
f.write('# Address=%s\n' % (self.address))
bin_address = socket.inet_pton(socket.AF_INET6, self.address)
- [f.write('%02x' % x) for x in bin_address]
+ for x in bin_address:
+ f.write('%02x' % x)
f.write('\n')
block = 0
maplen = None # default bitmap length, auto-calculate
bitmap = '040000000003' # an arbitrarily chosen bitmap sample
+ nextname = None
def dump(self, f):
# first, construct the bitmap data
f.write('%02x %02x %s\n' %
(block_list[i], maplen_list[i], bitmap_list[i]))
+ def dump_fixedpart(self, f, bitmap_totallen):
+ name_wire = encode_name(self.nextname)
+ if self.rdlen is None:
+ # if rdlen needs to be calculated, it must be based on the bitmap
+ # length, because the configured maplen can be fake.
+ self.rdlen = int(len(name_wire) / 2) + bitmap_totallen
+ self.dump_header(f, self.rdlen)
+ f.write('# Next Name=%s (%d bytes)\n' % (self.nextname,
+ int(len(name_wire) / 2)))
+ f.write('%s\n' % name_wire)
+
class NSEC(NSECBASE):
'''Implements rendering NSEC RDATA in the test data format.
nextname = 'next.example.com'
- def dump_fixedpart(self, f, bitmap_totallen):
- name_wire = encode_name(self.nextname)
- if self.rdlen is None:
- # if rdlen needs to be calculated, it must be based on the bitmap
- # length, because the configured maplen can be fake.
- self.rdlen = int(len(name_wire) / 2) + bitmap_totallen
- self.dump_header(f, self.rdlen)
- f.write('# Next Name=%s (%d bytes)\n' % (self.nextname,
- int(len(name_wire) / 2)))
- f.write('%s\n' % name_wire)
-
class NSEC3PARAM(RR):
'''Implements rendering NSEC3PARAM RDATA in the test data format.
self.rdlen = int(18 + len(name_wire) / 2 + len(str(sig_wire)) / 2)
self.dump_header(f, self.rdlen)
- if type(self.covered) is str:
+ if isinstance(self.covered, str):
self.covered = dict_rrtype[self.covered.lower()]
- if type(self.algorithm) is str:
+ if isinstance(self.algorithm, str):
self.algorithm = dict_algorithm[self.algorithm.lower()]
if self.labels is None:
self.labels = count_namelabels(self.signer)
name_wire = encode_name(self.algorithm)
mac_size = self.mac_size
if mac_size is None:
- if self.algorithm in self.dict_macsize.keys():
+ if self.algorithm in self.dict_macsize:
mac_size = self.dict_macsize[self.algorithm]
else:
raise RuntimeError('TSIG Mac Size cannot be determined')
'header': (DNSHeader, header_xtables),
'question': (DNSQuestion, question_xtables),
'edns': (EDNS, {})}
-for rrtype in dict_rrtype.keys():
+for rrtype in dict_rrtype:
# For any supported RR types add the tuple of (RR_CLASS, {}).
# We expect KeyError as not all the types are supported, and simply
# ignore them.
usage = 'usage: %prog [options] input_file'
-if __name__ == "__main__":
- parser = OptionParser(usage=usage)
- parser.add_option('-o', '--output', action='store', dest='output',
- default=None, metavar='FILE',
- help='output file name [default: prefix of input_file]')
- (options, args) = parser.parse_args()
+def main():
+ parser = argparse.ArgumentParser(usage=usage)
+ parser.add_argument('-o', '--output', action='store', dest='output',
+ default=None, metavar='FILE',
+ help='output file name [default: prefix of input_file]')
+ args = parser.parse_args()
if len(args) == 0:
parser.error('input file is missing')
configfile = args[0]
- outputfile = options.output
+ outputfile = args.output
if not outputfile:
m = re.match(r'(.*)\.[^.]+$', configfile)
if m:
raise ValueError('output file is not specified and input file is not in the form of "output_file.suffix"')
# DeprecationWarning: use ConfigParser directly
- config = configparser.SafeConfigParser()
+ config = configparser.SafeConfigParser() # pylint: disable=deprecated-class
config.read(configfile)
- output = open(outputfile, 'w')
+ output = open(outputfile, 'w', encoding='utf-8') # pylint: disable=consider-using-with
- print_header(output, configfile)
+ print_header(outputfile, configfile)
# First try the 'custom' mode; if it fails assume the query mode.
try:
obj.dump(output)
output.close()
+
+
+if __name__ == "__main__":
+ main()