# use OrderedDict to preserve order of fields in cmd-syntax
try:
descr = json.load(fp, object_pairs_hook=collections.OrderedDict)
- except:
- print('\nError while processing %s\n\n' % f)
+ except Exception as e:
+ print(f'\nError while processing {f}: {e}\n\n')
raise
if name != descr['name']:
exit("Expected name == descr['name'], but name is {name} and descr['name'] is {descr['name']}")
for dm, funcs in sorted(daemons.items()):
rst += '.. _commands-%s:\n\n' % dm
rst += 'Commands supported by `%s` daemon: ' % dm
- funcs = sorted([ ':ref:`%s <ref-%s>`' % (f['name'], f['name']) for f in funcs])
+ funcs = sorted([':ref:`%s <ref-%s>`' % (f['name'], f['name']) for f in funcs])
rst += ', '.join(funcs)
rst += '.\n\n'
for h, funcs in sorted(hooks.items()):
rst += '.. _commands-%s:\n\n' % h
rst += 'Commands supported by `%s` hook library: ' % h
- funcs = sorted([ ':ref:`%s <ref-%s>`' % (f['name'], f['name']) for f in funcs])
+ funcs = sorted([':ref:`%s <ref-%s>`' % (f['name'], f['name']) for f in funcs])
rst += ', '.join(funcs)
rst += '.\n\n'
# availability
rst += 'Availability: %s ' % func['avail']
- rst += '(:ref:`%s <commands-%s>` hook library)' % (func['hook'], func['hook']) if 'hook' in func else '(built-in)'
+ rst += f'(:ref:`{func["hook"]} <commands-{func["hook"]}>` hook library)' if 'hook' in func else '(built-in)'
rst += '\n\n'
# access
try:
access = func['access']
- except:
- print('\naccess missing in %s\n\n' % name)
+ except Exception as e:
+ print(f'\naccess missing in {name}: {e}\n\n')
raise
- if not access in ['read', 'write']:
+ if access not in ['read', 'write']:
print('\nUnknown access %s in %s\n\n' % (access, name))
raise ValueError('access must be read or write')
rst += 'Access: %s *(parameter ignored in this Kea version)* \n\n' % access
rst += '\n\n'
if 'cmd-comment' in func:
- for l in func['cmd-comment']:
- rst += "%s\n" % l
+ for line in func['cmd-comment']:
+ rst += "%s\n" % line
rst += '\n'
# response syntax
rst += '- 1 - error\n'
rst += '- 2 - unsupported\n'
rst += '- 3 - empty (command was completed successfully, but no data was affected or returned)\n'
- rst += '- 4 - conflict (command could not apply requested configuration changes because they were in conflict with the server state)\n\n'
+ rst += '- 4 - conflict (command could not apply requested configuration changes because they were '
+ rst += 'in conflict with the server state)\n\n'
return rst
# import sys
# sys.path.insert(0, os.path.abspath('.'))
-# to avoid "sphinx.errors.SphinxParallelError: RecursionError: maximum recursion depth exceeded while pickling an object"
+# to avoid sphinx.errors.SphinxParallelError: RecursionError: maximum recursion depth exceeded while pickling an object
import sys
sys.setrecursionlimit(5000)
release = candidate_release
break
version = release
-dashed_version_series='-'.join(version.split('.')[0:2])
+dashed_version_series = '-'.join(version.split('.')[0:2])
# now let's replace versions with odd minor number with dev
if int(dashed_version_series[-1]) % 2 != 0:
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
-#html_theme = 'alabaster'
+# html_theme = 'alabaster'
html_theme = 'sphinx_rtd_theme'
html_logo = 'static/kea-imageonly-100bw.png'
# further. For a list of options available for each theme, see the
# documentation.
#
-#html_theme_options = {
-# "logo": "kea-logo-100x70.png",
-#}
+# html_theme_options = {
+# "logo": "kea-logo-100x70.png",
+# }
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# -- Options for HTMLHelp output ---------------------------------------------
# Output file base name for HTML help builder.
-#htmlhelp_basename = 'KeaAdministratorReferenceManualdoc'
+# htmlhelp_basename = 'KeaAdministratorReferenceManualdoc'
# -- Options for LaTeX output ------------------------------------------------
# -- Substitutions -----------------------------------------------------------
-rst_prolog="""
+rst_prolog = """
.. |cloudsmith_repo| replace:: kea-{dashed_version_series}
""".format(dashed_version_series=dashed_version_series)
+
# -- Functions ---------------------------------------------------------------
# Do generation of api.rst and kea-messages.rst here in conf.py instead of Makefile.am
# The first entry on this list is the actual file to copy, the second is a unique name
# that will be used when copied over to arm/ directory.
FILES_TO_COPY = [
- [ '../../platforms.rst', 'platforms.rst' ],
- [ '../examples/template-power-user-home/info.md', 'template-power-user-home.md' ],
- [ '../examples/template-power-user-home/kea-ca-1.conf', 'template-power-user-home-ca-1.conf' ],
- [ '../examples/template-power-user-home/kea-ca-2.conf', 'template-power-user-home-ca-2.conf' ],
- [ '../examples/template-power-user-home/kea-dhcp4-1.conf', 'template-power-user-home-dhcp4-1.conf' ],
- [ '../examples/template-power-user-home/kea-dhcp4-2.conf', 'template-power-user-home-dhcp4-2.conf' ],
- [ '../examples/template-ha-mt-tls/info.md', 'template-ha-mt-tls.md' ],
- [ '../examples/template-ha-mt-tls/kea-ca-1.conf', 'template-ha-mt-tls-ca-1.conf' ],
- [ '../examples/template-ha-mt-tls/kea-ca-2.conf', 'template-ha-mt-tls-ca-2.conf' ],
- [ '../examples/template-ha-mt-tls/kea-dhcp4-1.conf', 'template-ha-mt-tls-dhcp4-1.conf' ],
- [ '../examples/template-ha-mt-tls/kea-dhcp4-2.conf', 'template-ha-mt-tls-dhcp4-2.conf' ]
+ ['../../platforms.rst', 'platforms.rst'],
+ ['../examples/template-power-user-home/info.md', 'template-power-user-home.md'],
+ ['../examples/template-power-user-home/kea-ca-1.conf', 'template-power-user-home-ca-1.conf'],
+ ['../examples/template-power-user-home/kea-ca-2.conf', 'template-power-user-home-ca-2.conf'],
+ ['../examples/template-power-user-home/kea-dhcp4-1.conf', 'template-power-user-home-dhcp4-1.conf'],
+ ['../examples/template-power-user-home/kea-dhcp4-2.conf', 'template-power-user-home-dhcp4-2.conf'],
+ ['../examples/template-ha-mt-tls/info.md', 'template-ha-mt-tls.md'],
+ ['../examples/template-ha-mt-tls/kea-ca-1.conf', 'template-ha-mt-tls-ca-1.conf'],
+ ['../examples/template-ha-mt-tls/kea-ca-2.conf', 'template-ha-mt-tls-ca-2.conf'],
+ ['../examples/template-ha-mt-tls/kea-dhcp4-1.conf', 'template-ha-mt-tls-dhcp4-1.conf'],
+ ['../examples/template-ha-mt-tls/kea-dhcp4-2.conf', 'template-ha-mt-tls-dhcp4-2.conf']
]
from shutil import copyfile
print("Copying %s to %s" % (src, dst))
copyfile(src, dst)
+
# custom setup hook
def setup(app):
app.add_crossref_type('isccmd', 'isccmd')
#!/usr/bin/env python3
-# Copyright (C) 2019-2021 Internet Systems Consortium, Inc. ("ISC")
+# Copyright (C) 2019-2024 Internet Systems Consortium, Inc. ("ISC")
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
messages[msg_id] = (section, msg_id, msg_text, msg_descr)
# start next message
- m = re.search('^%\s?([A-Z0-9_]+)\s+(.*)', line);
+ m = re.search(r'^%\s?([A-Z0-9_]+)\s+(.*)', line)
msg_id, msg_text = m.groups()
msg_descr = []
rst += msg_text + '\n\n'
- rst += ''.join([' ' + l + '\n' for l in msg_descr])
+ rst += ''.join([' ' + line + '\n' for line in msg_descr])
rst += '\n'
-
return rst
+
def generate(in_files, out_file):
messages = read_input_files(in_files)
'24.04': True,
},
'debian': {
- '8': False,
- '9': False,
+ '8': False,
+ '9': False,
'10': True,
'11': True,
'12': True,
end
"""
-RECOMMENDED_VAGRANT_VERSION='2.2.16'
+RECOMMENDED_VAGRANT_VERSION = '2.2.16'
log = logging.getLogger()
return '\033[1;31m%s\033[0;0m' % txt
return txt
+
def green(txt):
"""Return colorized (if the terminal supports it) or plain text."""
if sys.stdout.isatty():
return '\033[0;32m%s\033[0;0m' % txt
return txt
+
def blue(txt):
"""Return colorized (if the terminal supports it) or plain text."""
if sys.stdout.isatty():
revision = revision[0]
if not system or not revision:
raise Exception('fallback to /etc/os-release')
- except:
+ except Exception:
if os.path.exists('/etc/os-release'):
vals = {}
with open('/etc/os-release') as f:
- for l in f.readlines():
- if '=' in l:
- key, val = l.split('=', 1)
+ for line in f.readlines():
+ if '=' in line:
+ key, val = line.split('=', 1)
vals[key.strip()] = val.strip().replace('"', '')
for i in ['ID', 'ID_LIKE']:
if i in vals:
- system_candidates=vals[i].strip('"').split()
+ system_candidates = vals[i].strip('"').split()
for system_candidate in system_candidates:
if system_candidate in SYSTEMS:
system = system_candidate
break
else:
- continue
+ continue
break
if system is None:
raise Exception('cannot determine system')
for attempt in range(attempts):
if interactive:
- # Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified, security issue.
+ # Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified,
+ # security issue.
p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True) # nosec B602
exitcode = p.wait()
else:
- # Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified, security issue.
- p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # nosec B602
+ # Issue: [B602:subprocess_popen_with_shell_equals_true] subprocess call with shell=True identified,
+ # security issue.
+ p = subprocess.Popen(cmd, cwd=cwd, env=env, shell=True, # nosec B602
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if capture:
output = ''
for line in out.splitlines():
line = line.strip()
- m = re.search('^([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+(.+)', line)
+ m = re.search(r'^([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s+(.+)', line)
if not m:
continue
status, name, version, arch, descr = m.groups()
pkg_cache = {}
# prepare cache if needed
- if not pkg_cache and system in ['centos', 'rhel', 'fedora', 'debian', 'ubuntu', 'rocky']:#, 'alpine']: # TODO: complete caching support for alpine
+ if not pkg_cache and system in ['centos', 'rhel', 'fedora', 'debian', 'ubuntu',
+ 'rocky']: # , 'alpine']: # TODO: complete caching support for alpine
if system in ['centos', 'rhel', 'fedora', 'rocky']:
pkg_cache.update(_prepare_installed_packages_cache_for_rpms())
elif system in ['debian', 'ubuntu']:
capture=True, raise_error=False)
if exitcode != 0:
if 'There is container on your system' in out and 'lxc-destroy' in out:
- m = re.search('`lxc-destroy.*?`', out)
+ m = re.search(r'`lxc-destroy.*?`', out)
if m:
# destroy some old container
cmd = m.group(0)[1:-1]
# Reason for nosec: it is clearly a https link.
with urllib.request.urlopen(url) as response: # nosec B310
data = response.read()
- except:
- log.exception('ignored exception')
+ except Exception as e:
+ log.exception(f'ignored exception: {e}')
return {}
data = json.loads(data)
return data
if provider_found:
try:
v = int(ver['number'])
- except:
+ except ValueError:
return ver['number']
if v > latest_version:
latest_version = v
return "not created"
_, out = execute("vagrant status", cwd=self.vagrant_dir, timeout=15, capture=True, quiet=True)
- m = re.search('default\s+(.+)\(', out)
+ m = re.search(r'default\s+(.+)\(', out)
if not m:
raise Exception('cannot get status in:\n%s' % out)
return m.group(1).strip()
execute("vagrant reload --no-provision --force",
cwd=self.vagrant_dir, timeout=15 * 60, dry_run=self.dry_run)
-
def package(self):
"""Package Vagrant system into Vagrant box."""
execute('vagrant halt', cwd=self.vagrant_dir, dry_run=self.dry_run, raise_error=False, attempts=3)
# reset machine-id
execute('sudo rm -f %s/rootfs/var/lib/dbus/machine-id' % lxc_container_path)
- #execute('sudo truncate -s 0 %s/rootfs/etc/machine-id' % lxc_container_path)
+ # execute('sudo truncate -s 0 %s/rootfs/etc/machine-id' % lxc_container_path)
execute('sudo rm -f %s/rootfs/etc/machine-id' % lxc_container_path)
# pack rootfs
if self.system in ['ubuntu', 'debian']:
upload_cmd += ' -X POST -H "Content-Type: multipart/form-data" --data-binary "@%s" '
- file_ext = 'deb' # include both '.deb' and '.ddeb' files
+ file_ext = 'deb' # include both '.deb' and '.ddeb' files
elif self.system in ['fedora', 'centos', 'rhel', 'rocky']:
upload_cmd += ' --upload-file %s '
cmd = 'scp -F %s -r default:/home/vagrant/aggregated_tests.xml .' % ssh_cfg_path
execute(cmd, cwd=self.vagrant_dir)
- except: # pylint: disable=bare-except
- log.exception('ignored issue with parsing unit test results')
+ except Exception as e:
+ log.exception(f'ignored issue with parsing unit test results: {e}')
return total, passed
gtest_version = '1.14.0'
gtest_path = f'/usr/src/googletest-release-{gtest_version}/googletest'
if os.path.exists(gtest_path):
- log.info(f'gtest is already installed in {gtest_path}.')
- return
+ log.info(f'gtest is already installed in {gtest_path}.')
+ return
execute('mkdir -p ~/.hammer-tmp')
cmd = 'wget --no-verbose -O ~/.hammer-tmp/gtest.tar.gz '
execute('rm -rf ~/.hammer-tmp')
-def _install_libyang_from_sources(ignore_errors = False):
+def _install_libyang_from_sources(ignore_errors=False):
"""Install libyang from sources."""
for prefix in ['/usr', '/usr/local']:
libyang_so_candidates = [f'{prefix}/lib/libyang.so', f'{prefix}/lib64/libyang.so']
libyang_header = f'{prefix}/include/libyang/version.h'
if (any(os.path.exists(i) for i in libyang_so_candidates) and os.path.exists(libyang_header) and
- execute(f"grep -F '#define LY_VERSION_MAJOR 2' '{libyang_header}'", raise_error=False) == 0):
+ execute(f"grep -F '#define LY_VERSION_MAJOR 2' '{libyang_header}'", raise_error=False) == 0):
log.info(f'libyang is already installed at {libyang_header}.')
return
- version='v2.1.4'
+ version = 'v2.1.4'
execute('rm -rf ~/.hammer-tmp')
execute('mkdir -p ~/.hammer-tmp')
execute('git clone https://github.com/CESNET/libyang.git ~/.hammer-tmp/libyang')
execute(f'git checkout {version}', cwd='~/.hammer-tmp/libyang')
execute('mkdir ~/.hammer-tmp/libyang/build')
- execute('cmake -DBUILD_TESTING=OFF -DCMAKE_C_FLAGS="-Wno-incompatible-pointer-types" ..', cwd='~/.hammer-tmp/libyang/build')
+ execute('cmake -DBUILD_TESTING=OFF -DCMAKE_C_FLAGS="-Wno-incompatible-pointer-types" ..',
+ cwd='~/.hammer-tmp/libyang/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/libyang/build')
execute('sudo make install', cwd='~/.hammer-tmp/libyang/build')
system, revision = get_system_revision()
execute('rm -rf ~/.hammer-tmp')
-def _install_sysrepo_from_sources(ignore_errors = False):
+def _install_sysrepo_from_sources(ignore_errors=False):
"""Install sysrepo from sources."""
for prefix in ['/usr', '/usr/local']:
sysrepo_so_candidates = [f'{prefix}/lib/libsysrepo.so', f'{prefix}/lib64/libsysrepo.so']
sysrepo_header = f'{prefix}/include/sysrepo/version.h'
if (any(os.path.exists(i) for i in sysrepo_so_candidates) and os.path.exists(sysrepo_header) and
- execute(f"grep -F '#define SR_VERSION_MAJOR 7' '{sysrepo_header}'", raise_error=False) == 0):
+ execute(f"grep -F '#define SR_VERSION_MAJOR 7' '{sysrepo_header}'", raise_error=False) == 0):
log.info(f'sysrepo is already installed at {sysrepo_header}.')
return
- version='v2.2.12'
+ version = 'v2.2.12'
# Create repository for YANG modules and change ownership to current user.
execute('sudo mkdir -p /etc/sysrepo')
execute('rm -rf ~/.hammer-tmp')
-def _install_libyang_cpp_from_sources(ignore_errors = False):
+def _install_libyang_cpp_from_sources(ignore_errors=False):
"""Install libyang-cpp from sources."""
for prefix_lib in ['/usr/lib', '/usr/lib64', '/usr/local/lib', '/usr/local/lib64']:
libyang_cpp_so = f'{prefix_lib}/libyang-cpp.so'
libyang_cpp_pc = f'{prefix_lib}/pkgconfig/libyang-cpp.pc'
if (os.path.exists(libyang_cpp_so) and os.path.exists(libyang_cpp_pc) and
- execute(f"grep -F 'Version: 1.1.0' '{libyang_cpp_pc}'", raise_error=False) == 0):
+ execute(f"grep -F 'Version: 1.1.0' '{libyang_cpp_pc}'", raise_error=False) == 0):
log.info(f'libyang-cpp is already installed at {libyang_cpp_so}.')
return
- version='ae7d649ea75da081725c119dd553b2ef3121a6f8'
+ version = 'ae7d649ea75da081725c119dd553b2ef3121a6f8'
execute('rm -rf ~/.hammer-tmp')
execute('mkdir -p ~/.hammer-tmp')
execute('git clone https://github.com/CESNET/libyang-cpp.git ~/.hammer-tmp/libyang-cpp')
execute(f'git checkout {version}', cwd='~/.hammer-tmp/libyang-cpp')
# New cpp compiler is more picky about missing headers. (ex. Fedora 40)
- return_code = execute('sudo grep "#include <algorithm>" ~/.hammer-tmp/libyang-cpp/src/Context.cpp', raise_error=False)
+ return_code = execute('sudo grep "#include <algorithm>" ~/.hammer-tmp/libyang-cpp/src/Context.cpp',
+ raise_error=False)
if return_code == 1:
- execute('sed -i "/#include <libyang\/libyang.h>/a #include <algorithm>" ~/.hammer-tmp/libyang-cpp/src/Context.cpp')
+ execute(r'sed -i "/#include <libyang\/libyang.h>/a #include <algorithm>" '
+ '~/.hammer-tmp/libyang-cpp/src/Context.cpp')
execute('mkdir ~/.hammer-tmp/libyang-cpp/build')
execute('cmake -DBUILD_TESTING=OFF .. ', cwd='~/.hammer-tmp/libyang-cpp/build')
execute('make -j $(nproc || gnproc || echo 1)', cwd='~/.hammer-tmp/libyang-cpp/build')
execute('rm -rf ~/.hammer-tmp')
-def _install_sysrepo_cpp_from_sources(ignore_errors = False):
+def _install_sysrepo_cpp_from_sources(ignore_errors=False):
"""Install sysrepo-cpp from sources."""
for prefix_lib in ['/usr/lib', '/usr/lib64', '/usr/local/lib', '/usr/local/lib64']:
sysrepo_cpp_so = f'{prefix_lib}/libsysrepo-cpp.so'
sysrepo_cpp_pc = f'{prefix_lib}/pkgconfig/sysrepo-cpp.pc'
if (os.path.exists(sysrepo_cpp_so) and os.path.exists(sysrepo_cpp_pc) and
- execute(f"grep -F 'Version: 1.1.0' '{sysrepo_cpp_pc}'", raise_error=False) == 0):
+ execute(f"grep -F 'Version: 1.1.0' '{sysrepo_cpp_pc}'", raise_error=False) == 0):
log.info(f'sysrepo-cpp is already installed at {sysrepo_cpp_so}.')
return
- version='02634174ffc60568301c3d9b9b7cf710cff6a586'
+ version = '02634174ffc60568301c3d9b9b7cf710cff6a586'
execute('rm -rf ~/.hammer-tmp')
execute('mkdir -p ~/.hammer-tmp')
execute('rm -rf ~/.hammer-tmp')
-def _install_netconf_libraries_from_sources(ignore_errors = False):
+def _install_netconf_libraries_from_sources(ignore_errors=False):
_install_libyang_from_sources(ignore_errors)
_install_sysrepo_from_sources(ignore_errors)
_install_libyang_cpp_from_sources(ignore_errors)
# Some systems, usually old ones, might require a cerain PKCS format
# of the key. Try to regenerate it here, but don't stop if it fails.
# If the key is wrong, it will fail later anyway.
- exit_code = execute('openssl rsa -in src/lib/asiolink/testutils/ca/kea-server.key ' \
- '-out src/lib/asiolink/testutils/ca/kea-server.key', raise_error=False)
+ exit_code = execute('openssl rsa -in src/lib/asiolink/testutils/ca/kea-server.key '
+ '-out src/lib/asiolink/testutils/ca/kea-server.key', raise_error=False)
if exit_code != 0:
log.warning(f'openssl command failed with exit code {exit_code}, but continuing...')
for file in [
elif system == 'freebsd':
cmd = "echo 'SET PASSWORD = \"\";' "
- cmd += "| sudo mysql -u root --password=\"$(sudo cat /root/.mysql_secret | grep -v '^#')\" --connect-expired-password"
+ cmd += ("| sudo mysql -u root --password=\"$(sudo cat /root/.mysql_secret | grep -v '^#')\""
+ " --connect-expired-password")
execute(cmd, raise_error=False)
elif system == 'alpine':
if 'tls' in features:
# ALTER USER is the best place to put the REQUIRE but, if it is not
# supported, then downgrade to GRANT.
- exit_code = execute('''sudo mysql -u root -e "ALTER USER 'keatest_secure'@'localhost' REQUIRE X509;"''', raise_error=False)
+ exit_code = execute('''sudo mysql -u root -e "ALTER USER 'keatest_secure'@'localhost' REQUIRE X509;"''',
+ raise_error=False)
if exit_code == 0:
# If ALTER succeeds, then we still have to GRANT without REQUIRE.
execute('''sudo mysql -u root -e "GRANT ALL ON keatest.* TO 'keatest_secure'@'localhost';"''')
log.error('Command "sudo systemctl restart postgresql.service" failed. Here is the journal:')
_, output = execute('sudo journalctl -xu postgresql.service', raise_error=False)
log.error('And here are the logs:')
- _, output = execute("sudo -u postgres psql -A -t -c 'SELECT pg_current_logfile()'", capture=True, quiet=True)
+ _, output = execute("sudo -u postgres psql -A -t -c 'SELECT pg_current_logfile()'",
+ capture=True, quiet=True)
logfile = os.path.basename(output.strip())
- _, output = execute(f'sudo find /var -type f -name "{logfile}" -exec cat {{}} \;', raise_error=False)
+ _, output = execute(fr'sudo find /var -type f -name "{logfile}" -exec cat {{}} \;', raise_error=False)
sys.exit(exit_code)
# and user both set to 'all'. This is to not affect authentication of
# `postgres` user which should have a separate entry.
def _change_postgresql_auth_method(connection_type, auth_method, hba_file):
- execute("sudo sed -i.bak 's/^{}\(.*\)all\(.*\)all\(.*\) [a-z0-9]*$/{}\\1all\\2all\\3 {}/g' '{}'".format(
+ execute(r"sudo sed -i.bak 's/^{}\(.*\)all\(.*\)all\(.*\) [a-z0-9]*$/{}\1all\2all\3 {}/g' '{}'".format(
connection_type, connection_type, auth_method, hba_file), cwd='/tmp')
var_db_postgres_data = output.rstrip()
# Create postgres internals.
- execute('sudo test ! -d {} && sudo /usr/local/etc/rc.d/postgresql oneinitdb || true'.format(var_db_postgres_data))
+ execute(f'sudo test ! -d {var_db_postgres_data} && sudo /usr/local/etc/rc.d/postgresql oneinitdb || true')
- # if the file '/var/db/postgres/data*/postmaster.opts' does not exist the 'restart' of postgresql will fail with error:
+ # if the file '/var/db/postgres/data*/postmaster.opts' does not exist the 'restart' of postgresql will fail
+ # with error:
# pg_ctl: could not read file "/var/db/postgres/data*/postmaster.opts"
# the initial start of the postgresql will create the 'postmaster.opts' file
- execute('sudo test ! -f {}/postmaster.opts && sudo service postgresql onestart || true'.format(var_db_postgres_data))
+ execute(f'sudo test ! -f {var_db_postgres_data}/postmaster.opts && sudo service postgresql onestart || true')
_enable_postgresql(system, revision)
_restart_postgresql(system, revision)
# before any other local auth method for higher priority. Let's simulate
# that by putting it just after the auth header.
if 0 != execute("sudo cat {} | grep -E '^local.*all.*postgres'".format(hba_file), raise_error=False):
- auth_header='# TYPE DATABASE USER ADDRESS METHOD'
- postgres_auth_line='local all postgres ident'
+ auth_header = '# TYPE DATABASE USER ADDRESS METHOD'
+ postgres_auth_line = 'local all postgres ident'
# The "\\" followed by newline is for BSD support.
execute("""sudo sed -i.bak '/{}/a\\
{}
elif system in ['centos', 'fedora', 'rhel', 'rocky']:
cmd = "dnf list {} -y | tr -s ' ' | cut -d ' ' -f 2 | tail -n 1"
elif system == 'freebsd':
- cmd = "pkg search {0} | grep -Eo '^{0}-[0-9_,\.]+' | sed 's/{0}-//g'"
+ cmd = r"pkg search {0} | grep -Eo '^{0}-[0-9_,\.]+' | sed 's/{0}-//g'"
elif system == 'arch':
cmd = "pacman -Qi {} | tr -s ' ' | grep -F 'Version :' | cut -d ' ' -f 3"
else:
packages.extend(['postgresql', 'postgresql-server'])
if revision == '9':
packages.append('postgresql13-devel')
+
def link_pg_config():
if not os.path.exists('/usr/bin/pg_config'):
execute('sudo ln -s /usr/pgsql-13/bin/pg_config /usr/bin/pg_config')
+
deferred_functions.append(link_pg_config)
else:
packages.append('postgresql-devel')
else:
packages.extend(['mariadb-client', 'mariadb-server', 'libmariadb-dev-compat'])
-
if 'pgsql' in features:
if revision == '16.04':
packages.extend(['postgresql-client', 'libpq-dev', 'postgresql', 'postgresql-server-dev-all'])
for f in os.listdir(rpm_dir):
if f == 'kea.spec':
continue
- execute('cp %s %s/SOURCES' % (os.path.join(rpm_dir, f), rpm_root_path), check_times=check_times, dry_run=dry_run)
- execute('cp %s %s/SPECS' % (os.path.join(rpm_dir, 'kea.spec'), rpm_root_path), check_times=check_times, dry_run=dry_run)
+ execute('cp %s %s/SOURCES' % (os.path.join(rpm_dir, f), rpm_root_path), check_times=check_times,
+ dry_run=dry_run)
+ execute('cp %s %s/SPECS' % (os.path.join(rpm_dir, 'kea.spec'), rpm_root_path), check_times=check_times,
+ dry_run=dry_run)
execute('cp %s %s/SOURCES' % (tarball_path, rpm_root_path), check_times=check_times, dry_run=dry_run)
services_list = ['kea-dhcp4.service', 'kea-dhcp6.service', 'kea-dhcp-ddns.service', 'kea-ctrl-agent.service']
# centos/rhel 7 does not support some fields in systemd unit files so they need to be commented out
if system == 'centos' and revision == '7':
for f in services_list:
- for k in ['RuntimeDirectory', 'RuntimeDirectoryPreserve', 'LogsDirectory', 'LogsDirectoryMode', 'StateDirectory', 'ConfigurationDirectory']:
- cmd = "sed -i -E 's/^(%s=.*)/#\\1/' %s" % (k, f)
+ for k in ['RuntimeDirectory', 'RuntimeDirectoryPreserve', 'LogsDirectory', 'LogsDirectoryMode',
+ 'StateDirectory', 'ConfigurationDirectory']:
+ cmd = r"sed -i -E 's/^(%s=.*)/#\1/' %s" % (k, f)
execute(cmd, cwd=rpm_dir, check_times=check_times, dry_run=dry_run)
-
# do rpm build
cmd = "rpmbuild --define 'kea_version %s' --define 'isc_version %s' -ba %s/SPECS/kea.spec"
cmd += " -D'_topdir %s'"
if 'install' in features:
# install packages
- execute('rpm -qa | grep isc-kea | xargs sudo rpm -e', check_times=check_times, dry_run=dry_run, raise_error=False)
+ execute('rpm -qa | grep isc-kea | xargs sudo rpm -e', check_times=check_times, dry_run=dry_run,
+ raise_error=False)
execute(f'sudo rpm -i {rpm_root_path}/RPMS/{arch.strip()}/*rpm', check_times=check_times, dry_run=dry_run)
# check if kea services can be started
install_pkgs('apt-transport-https', env=env, check_times=check_times)
# See if a .deb package had been previously uploaded.
- _, output = execute("curl -o /dev/null -s -w '%{{http_code}}' {}/dists/kea/Release 2>/dev/null".format(repo_url), capture=True)
+ _, output = execute("curl -o /dev/null -s -w '%{{http_code}}' {}/dists/kea/Release 2>/dev/null".format(repo_url),
+ capture=True)
http_code = output.rstrip()
release_file_exists = (http_code == '200')
if release_file_exists:
else:
repo_name = 'kea-%s-%s-%s' % (pkg_version.rsplit('.', 1)[0], system, revision)
log.error(f'{repo_url}/dists/kea/Release does not exist. '
- f'This is usually caused by no package existing in {repo_name}. '
- 'You can solve this by uploading any package.'
- 'Continuing, but the build will likely fail.')
+ f'This is usually caused by no package existing in {repo_name}. '
+ 'You can solve this by uploading any package.'
+ 'Continuing, but the build will likely fail.')
# try apt update for up to 10 times if there is an error
for _ in range(10):
src_path = glob.glob('kea-src/*')[0]
# update version, etc
- execute('sed -i -e s/{VERSION}/%s/ changelog' % pkg_version, cwd='kea-src/kea-%s/debian' % pkg_version, check_times=check_times, dry_run=dry_run)
- execute('sed -i -e s/{ISC_VERSION}/%s/ changelog' % pkg_isc_version, cwd='kea-src/kea-%s/debian' % pkg_version, check_times=check_times, dry_run=dry_run)
- execute('sed -i -e s/{ISC_VERSION}/%s/ rules' % pkg_isc_version, cwd='kea-src/kea-%s/debian' % pkg_version, check_times=check_times, dry_run=dry_run)
+ execute('sed -i -e s/{VERSION}/%s/ changelog' % pkg_version, cwd='kea-src/kea-%s/debian' % pkg_version,
+ check_times=check_times, dry_run=dry_run)
+ execute('sed -i -e s/{ISC_VERSION}/%s/ changelog' % pkg_isc_version, cwd='kea-src/kea-%s/debian' % pkg_version,
+ check_times=check_times, dry_run=dry_run)
+ execute('sed -i -e s/{ISC_VERSION}/%s/ rules' % pkg_isc_version, cwd='kea-src/kea-%s/debian' % pkg_version,
+ check_times=check_times, dry_run=dry_run)
- services_list = ['isc-kea-dhcp4.isc-kea-dhcp4-server.service', 'isc-kea-dhcp6.isc-kea-dhcp6-server.service', 'isc-kea-dhcp-ddns.isc-kea-dhcp-ddns-server.service', 'isc-kea-ctrl-agent.service']
+ services_list = ['isc-kea-dhcp4.isc-kea-dhcp4-server.service', 'isc-kea-dhcp6.isc-kea-dhcp6-server.service',
+ 'isc-kea-dhcp-ddns.isc-kea-dhcp-ddns-server.service', 'isc-kea-ctrl-agent.service']
# debian 9 does not support some fields in systemd unit files so they need to be commented out
if system == 'debian' and revision == '9':
for f in services_list:
- for k in ['RuntimeDirectory', 'RuntimeDirectoryPreserve', 'LogsDirectory', 'LogsDirectoryMode', 'StateDirectory', 'ConfigurationDirectory']:
+ for k in ['RuntimeDirectory', 'RuntimeDirectoryPreserve', 'LogsDirectory', 'LogsDirectoryMode',
+ 'StateDirectory', 'ConfigurationDirectory']:
cmd = "sed -i -E 's/^(%s=.*)/#\\1/' %s" % (k, f)
execute(cmd, cwd='kea-src/kea-%s/debian' % pkg_version, check_times=check_times, dry_run=dry_run)
# do deb build
env['LIBRARY_PATH'] = f'/usr/lib/{arch.strip()}-linux-gnu'
env['LD_LIBRARY_PATH'] = f'/usr/lib/{arch.strip()}-linux-gnu'
- cmd = 'debuild --preserve-envvar=LD_LIBRARY_PATH --preserve-envvar=LIBRARY_PATH --preserve-envvar=CCACHE_DIR --prepend-path=/usr/lib/ccache -i -us -uc -b'
+ cmd = ('debuild --preserve-envvar=LD_LIBRARY_PATH --preserve-envvar=LIBRARY_PATH --preserve-envvar=CCACHE_DIR '
+ '--prepend-path=/usr/lib/ccache -i -us -uc -b')
execute(cmd, env=env, cwd=src_path, timeout=60 * 40, check_times=check_times, dry_run=dry_run)
if 'install' in features:
# install packages
execute('sudo dpkg -i kea-src/*deb', check_times=check_times, dry_run=dry_run)
# check if kea services can be started
- services_list = ['isc-kea-dhcp4-server.service', 'isc-kea-dhcp6-server.service', 'isc-kea-dhcp-ddns-server.service', 'isc-kea-ctrl-agent.service']
+ services_list = ['isc-kea-dhcp4-server.service', 'isc-kea-dhcp6-server.service',
+ 'isc-kea-dhcp-ddns-server.service', 'isc-kea-ctrl-agent.service']
_check_installed_rpm_or_debs(services_list)
tardir = os.path.dirname(tarball_path)
if not tardir:
tardir = '.'
- cmd = 'cd %s; export kea_chks=`sha512sum kea-%s.tar.gz`; cd -; sed -i -e "s/KEA_CHECKSUM/${kea_chks}/" kea-src/APKBUILD' % (tardir, pkg_version)
+ cmd = ('cd %s; export kea_chks=`sha512sum kea-%s.tar.gz`; cd -; '
+ 'sed -i -e "s/KEA_CHECKSUM/${kea_chks}/" kea-src/APKBUILD' % (tardir, pkg_version))
execute(cmd, check_times=check_times, dry_run=dry_run)
cmd = 'sed -i -e s/KEA_VERSION/%s/ kea-src/APKBUILD' % pkg_version
execute(cmd, check_times=check_times, dry_run=dry_run)
if exitcode != 0:
_install_vagrant()
else:
- m = re.search('Installed Version: ([\d\.]+)', out, re.I)
+ m = re.search(r'Installed Version: ([\d\.]+)', out, re.I)
ver = m.group(1)
vagrant = [int(v) for v in ver.split('.')]
recommended_vagrant = [int(v) for v in RECOMMENDED_VAGRANT_VERSION.split('.')]
if vagrant < recommended_vagrant:
- m = re.search('Latest Version: ([\d\.]+)', out, re.I)
+ m = re.search(r'Latest Version: ([\d\.]+)', out, re.I)
if m is None:
# Vagrant was unable to check for the latest version of Vagrant.
# Attempt to upgrade to the recommended version to fix it.
ver = m.group(1)
_install_vagrant(ver, upgrade=True)
-
exitcode = execute('vagrant plugin list | grep vagrant-lxc', raise_error=False)
if exitcode != 0:
execute('vagrant plugin install vagrant-lxc')
help='Do not allow executing commands infinitely.')
parent_parser2.add_argument('-n', '--dry-run', action='store_true', help='Print only what would be done.')
-
parser = subparsers.add_parser('ensure-hammer-deps',
help="Install Hammer dependencies on current, host system.")
parser = subparsers.add_parser('supported-systems',
"To get the list of created systems run: ./hammer.py created-systems.")
parser.add_argument('-d', '--directory', help='Path to directory with Vagrantfile.')
parser = subparsers.add_parser('package-box',
- help="Prepare system from scratch and package it into Vagrant Box. Prepared box can be "
- "later deployed to Vagrant Cloud.",
+ help="Prepare system from scratch and package it into Vagrant Box. "
+ "Prepared box can be later deployed to Vagrant Cloud.",
parents=[parent_parser1, parent_parser2])
parser.add_argument('--repository-url', default=None,
help='Repository for 3rd party dependencies and for uploading built packages.')
parser.add_argument('-u', '--reuse', action='store_true',
- help='Reuse existing system image, otherwise (default case) if there is any existing then destroy it first.')
+ help='Reuse existing system image, otherwise (default case) if there is any existing then '
+ 'destroy it first.')
parser.add_argument('-k', '--skip-upload', action='store_true',
help='Skip uploading prepared box to cloud, otherwise (default case) upload it.')
-
args = main_parser.parse_args()
return args, main_parser
"""List systems hammer can support (with supported providers)."""
for system in SYSTEMS:
print('%s:' % system)
- for release,supported in SYSTEMS[system].items():
+ for release, supported in SYSTEMS[system].items():
if not supported:
continue
providers = []
providers = ', '.join(providers)
print(' - %s: %s' % (release, providers))
+
def list_created_systems():
"""List VMs that are created on this host by Hammer."""
_, output = execute('vagrant global-status --prune', quiet=True, capture=True)
log.error(msg)
sys.exit(1)
if not SYSTEMS[system][revision]:
- log.warning(f'{system} ${revision} is no longer officially supported. ' \
- 'The script will continue in a best-effort manner.')
+ log.warning(f'{system} ${revision} is no longer officially supported. '
+ 'The script will continue in a best-effort manner.')
def _prepare_ccache_dir(ccache_dir, system, revision):
file_ext = ''
if system in ['ubuntu', 'debian']:
upload_cmd += ' -X POST -H "Content-Type: multipart/form-data" --data-binary "@%s" '
- file_ext = 'deb' # include both '.deb' and '.ddeb' files
+ file_ext = 'deb' # include both '.deb' and '.ddeb' files
elif system in ['fedora', 'centos', 'rhel', 'rocky']:
upload_cmd += ' --upload-file %s '
log.info("file path: %s", fp)
cmd = upload_cmd % fp
- attempts=4
+ attempts = 4
while attempts > 0:
exitcode, output = execute(cmd, capture=True, raise_error=False)
if exitcode != 0 and '504 Gateway Time-out' in output:
args.ccache_dir, args.pkg_version, args.pkg_isc_version, args.repository_url, pkgs_dir)
# NOTE: upload the locally build packages and leave; the rest of the code is vagrant specific
if args.upload:
- upload_to_repo(args,pkgs_dir)
+ upload_to_repo(args, pkgs_dir)
return
features = set(['docs', 'perfdhcp', 'shell', 'mysql', 'pgsql', 'gssapi', 'native-pkg'])
log.info('Enabled features: %s', ' '.join(features))
- package_box(args.provider, args.system, args.revision, features, args.dry_run, args.check_times, args.reuse, args.skip_upload)
+ package_box(args.provider, args.system, args.revision, features, args.dry_run, args.check_times, args.reuse,
+ args.skip_upload)
elif args.command == "prepare-system":
prepare_system_cmd(args)
-# Copyright (C) 2017-2021 Internet Systems Consortium, Inc. ("ISC")
+# Copyright (C) 2017-2024 Internet Systems Consortium, Inc. ("ISC")
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
This file contains classes used for communication with Control Agent.
"""
+
class CARequest:
"""
This class defines the HTTP request to be sent.
if self.service is not None:
self.service = [x for x in self.service if x]
if len(self.service) > 0:
- self.content += ', "service": ["' + '","'.join(self.service) + '"]'
+ self.content += ', "service": ["' + '","'.join(self.service) + '"]'
if len(self.args) > 1:
self.content += ', "arguments": { ' + self.args + ' }'
self.content += ' }'
In particular, this method generates Content-Length and its value.
"""
- self.headers['User-Agent'] = "Kea-shell/%s"%(self.version)
+ self.headers['User-Agent'] = "Kea-shell/%s" % (self.version)
self.headers['Accept'] = '*/*'
if self.auth is not None:
- self.headers['Authorization'] = "Basic %s"%(self.auth)
+ self.headers['Authorization'] = "Basic %s" % (self.auth)
self.headers['Content-Type'] = 'application/json'
- self.headers['Content-Length'] = "%d"%(len(self.content))
+ self.headers['Content-Length'] = "%d" % (len(self.content))
class CAResponse:
import ssl
import os
-from kea_conn import CAResponse # CARequest
+from kea_conn import CAResponse # CARequest
+
def send_to_control_agent(params):
""" Sends a request to Control Agent, receives a response and returns it."""
#!@PYTHON@
-# Copyright (C) 2017-2020 Internet Systems Consortium, Inc. ("ISC")
+# Copyright (C) 2017-2024 Internet Systems Consortium, Inc. ("ISC")
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
from kea_conn import CARequest
+
class CARequestUnitTest(unittest.TestCase):
"""
This class is dedicated to testing CARequest class. That class
This method is called after each test. Currently it does nothing.
"""
+
if __name__ == '__main__':
unittest.main()
-# Copyright (C) 2013-2017 Internet Systems Consortium, Inc. ("ISC")
+# Copyright (C) 2013-2024 Internet Systems Consortium, Inc. ("ISC")
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
file_out.write("#ifndef BIND10_COMMON_DEFS_H\n" +
"#define BIND10_COMMON_DEFS_H\n" +
"\n" +
- "// \\file " + filename_out + "\n" +
-'''// \\brief Common shared constants\n
-// This file contains common definitions of constants used across the sources.
-// It includes, but is not limited to the definitions of messages sent from
-// one process to another. Since the names should be self-explanatory and
-// the variables here are used mostly to synchronize the same values across
-// multiple programs, separate documentation for each variable is not provided.
+ "// \\file " + filename_out + '''
+/// @brief Common shared constants
+///
+/// This file contains common definitions of constants used across the sources.
+/// It includes, but is not limited to the definitions of messages sent from
+/// one process to another. Since the names should be self-explanatory and
+/// the variables here are used mostly to synchronize the same values across
+/// multiple programs, separate documentation for each variable is not provided.
''')
continue
# Extract the constant. Remove the values and add "extern"
examples.
"""
-import configparser, re, time, socket, sys, base64
+import configparser
+import re
+import time
+import socket
+import sys
+import base64
from datetime import datetime
from optparse import OptionParser
dnssec_timefmt = '%Y%m%d%H%M%S'
-dict_qr = { 'query' : 0, 'response' : 1 }
-dict_opcode = { 'query' : 0, 'iquery' : 1, 'status' : 2, 'notify' : 4,
- 'update' : 5 }
+dict_qr = {'query': 0, 'response': 1}
+dict_opcode = {'query': 0, 'iquery': 1, 'status': 2, 'notify': 4,
+ 'update': 5}
rdict_opcode = dict([(dict_opcode[k], k.upper()) for k in dict_opcode.keys()])
-dict_rcode = { 'noerror' : 0, 'formerr' : 1, 'servfail' : 2, 'nxdomain' : 3,
- 'notimp' : 4, 'refused' : 5, 'yxdomain' : 6, 'yxrrset' : 7,
- 'nxrrset' : 8, 'notauth' : 9, 'notzone' : 10 }
+dict_rcode = {'noerror': 0, 'formerr': 1, 'servfail': 2, 'nxdomain': 3,
+ 'notimp': 4, 'refused': 5, 'yxdomain': 6, 'yxrrset': 7,
+ 'nxrrset': 8, 'notauth': 9, 'notzone': 10}
rdict_rcode = dict([(dict_rcode[k], k.upper()) for k in dict_rcode.keys()])
-dict_rrtype = { 'none' : 0, 'a' : 1, 'ns' : 2, 'md' : 3, 'mf' : 4, 'cname' : 5,
- 'soa' : 6, 'mb' : 7, 'mg' : 8, 'mr' : 9, 'null' : 10,
- 'wks' : 11, 'ptr' : 12, 'hinfo' : 13, 'minfo' : 14, 'mx' : 15,
- 'txt' : 16, 'rp' : 17, 'afsdb' : 18, 'x25' : 19, 'isdn' : 20,
- 'rt' : 21, 'nsap' : 22, 'nsap_tr' : 23, 'sig' : 24, 'key' : 25,
- 'px' : 26, 'gpos' : 27, 'aaaa' : 28, 'loc' : 29, 'nxt' : 30,
- 'srv' : 33, 'naptr' : 35, 'kx' : 36, 'cert' : 37, 'a6' : 38,
- 'dname' : 39, 'opt' : 41, 'apl' : 42, 'ds' : 43, 'sshfp' : 44,
- 'ipseckey' : 45, 'rrsig' : 46, 'nsec' : 47, 'dnskey' : 48,
- 'dhcid' : 49, 'nsec3' : 50, 'nsec3param' : 51, 'tlsa' : 52, 'hip' : 55,
- 'spf' : 99, 'unspec' : 103, 'tkey' : 249, 'tsig' : 250,
- 'dlv' : 32769, 'ixfr' : 251, 'axfr' : 252, 'mailb' : 253,
- 'maila' : 254, 'any' : 255, 'caa' : 257 }
+dict_rrtype = {'none': 0, 'a': 1, 'ns': 2, 'md': 3, 'mf': 4, 'cname': 5,
+ 'soa': 6, 'mb': 7, 'mg': 8, 'mr': 9, 'null': 10,
+ 'wks': 11, 'ptr': 12, 'hinfo': 13, 'minfo': 14, 'mx': 15,
+ 'txt': 16, 'rp': 17, 'afsdb': 18, 'x25': 19, 'isdn': 20,
+ 'rt': 21, 'nsap': 22, 'nsap_tr': 23, 'sig': 24, 'key': 25,
+ 'px': 26, 'gpos': 27, 'aaaa': 28, 'loc': 29, 'nxt': 30,
+ 'srv': 33, 'naptr': 35, 'kx': 36, 'cert': 37, 'a6': 38,
+ 'dname': 39, 'opt': 41, 'apl': 42, 'ds': 43, 'sshfp': 44,
+ 'ipseckey': 45, 'rrsig': 46, 'nsec': 47, 'dnskey': 48,
+ 'dhcid': 49, 'nsec3': 50, 'nsec3param': 51, 'tlsa': 52, 'hip': 55,
+ 'spf': 99, 'unspec': 103, 'tkey': 249, 'tsig': 250,
+ 'dlv': 32769, 'ixfr': 251, 'axfr': 252, 'mailb': 253,
+ 'maila': 254, 'any': 255, 'caa': 257}
rdict_rrtype = dict([(dict_rrtype[k], k.upper()) for k in dict_rrtype.keys()])
-dict_rrclass = { 'in' : 1, 'ch' : 3, 'hs' : 4, 'any' : 255 }
-rdict_rrclass = dict([(dict_rrclass[k], k.upper()) for k in \
- dict_rrclass.keys()])
-dict_algorithm = { 'rsamd5' : 1, 'dh' : 2, 'dsa' : 3, 'ecc' : 4,
- 'rsasha1' : 5 }
-dict_nsec3_algorithm = { 'reserved' : 0, 'sha1' : 1 }
-rdict_algorithm = dict([(dict_algorithm[k], k.upper()) for k in \
- dict_algorithm.keys()])
-rdict_nsec3_algorithm = dict([(dict_nsec3_algorithm[k], k.upper()) for k in \
- dict_nsec3_algorithm.keys()])
-
-header_xtables = { 'qr' : dict_qr, 'opcode' : dict_opcode,
- 'rcode' : dict_rcode }
-question_xtables = { 'rrtype' : dict_rrtype, 'rrclass' : dict_rrclass }
-
-def parse_value(value, xtable = {}):
+dict_rrclass = {'in': 1, 'ch': 3, 'hs': 4, 'any': 255}
+rdict_rrclass = dict([(dict_rrclass[k], k.upper()) for k in dict_rrclass.keys()])
+dict_algorithm = {'rsamd5': 1, 'dh': 2, 'dsa': 3, 'ecc': 4,
+ 'rsasha1': 5}
+dict_nsec3_algorithm = {'reserved': 0, 'sha1': 1}
+rdict_algorithm = dict([(dict_algorithm[k], k.upper()) for k in dict_algorithm.keys()])
+rdict_nsec3_algorithm = dict([(dict_nsec3_algorithm[k], k.upper()) for k in dict_nsec3_algorithm.keys()])
+
+header_xtables = {'qr': dict_qr, 'opcode': dict_opcode,
+ 'rcode': dict_rcode}
+question_xtables = {'rrtype': dict_rrtype, 'rrclass': dict_rrclass}
+
+
+def parse_value(value, xtable={}):
if re.search(re_hex, value):
return int(value, 16)
if re.search(re_decimal, value):
return xtable[lovalue]
return value
+
def code_totext(code, dict):
if code in dict.keys():
return dict[code] + '(' + str(code) + ')'
return str(code)
+
def encode_name(name, absolute=True):
# make sure the name is dot-terminated. duplicate dots will be ignored
# below.
name += '.'
labels = name.split('.')
wire = ''
- for l in labels:
- if len(l) > 4 and l[0:4] == 'ptr=':
+ for label in labels:
+ if len(label) > 4 and label[0:4] == 'ptr=':
# special meta-syntax for compression pointer
wire += '%04x' % (0xc000 | int(l[4:]))
break
- if absolute or len(l) > 0:
- wire += '%02x' % len(l)
- wire += ''.join(['%02x' % ord(ch) for ch in l])
- if len(l) == 0:
+ if absolute or len(label) > 0:
+ wire += '%02x' % len(label)
+ wire += ''.join(['%02x' % ord(ch) for ch in label])
+ if len(label) == 0:
break
return wire
+
def encode_string(name, len=None):
if type(name) is int and len is not None:
return '%0.*x' % (len * 2, name)
return ''.join(['%02x' % ord(ch) for ch in name])
+
def encode_bytes(name, len=None):
if type(name) is int and len is not None:
return '%0.*x' % (len * 2, name)
return ''.join(['%02x' % ch for ch in name])
+
def count_namelabels(name):
if name == '.': # special case
return 0
- m = re.match('^(.*)\.$', name)
+ m = re.match(r'^(.*)\.$', name)
if m:
name = m.group(1)
return len(name.split('.'))
-def get_config(config, section, configobj, xtables = {}):
+
+def get_config(config, section, configobj, xtables={}):
try:
for field in config.options(section):
value = config.get(section, field)
return False
return True
+
def print_header(f, input_file):
f.write('''###
### This data file was auto-generated from ''' + input_file + '''
###
''')
+
class Name:
'''Implements rendering a single domain name in the test data format.
'''
name = 'example.com'
- pointer = None # no compression by default
+ pointer = None # no compression by default
+
def dump(self, f):
name = self.name
if self.pointer is not None:
f.write('%s' % name_wire)
f.write('\n')
+
class DNSHeader:
'''Implements rendering a DNS Header section in the test data format.
f.write('%04x %04x %04x %04x\n' % (self.qdcount, self.ancount,
self.nscount, self.arcount))
+
class DNSQuestion:
'''Implements rendering a DNS question in the test data format.
f.write(encode_name(self.name))
f.write(' %04x %04x\n' % (self.rrtype, self.rrclass))
+
class EDNS:
'''Implements rendering EDNS OPT RR in the test data format.
do = 0
mbz = 0
rdlen = 0
+
def dump(self, f):
f.write('\n# EDNS OPT RR\n')
f.write('# NAME=%s TYPE=%s UDPSize=%d ExtRcode=%s Version=%s DO=%d\n' %
(self.name, code_totext(dict_rrtype['opt'], rdict_rrtype),
self.udpsize, self.extrcode, self.version,
1 if self.do else 0))
-
+
code_vers = (self.extrcode << 8) | (self.version & 0x00ff)
extflags = (self.do << 15) | (self.mbz & ~0x8000)
f.write('%s %04x %04x %04x %04x\n' %
f.write('# RDLEN=%d\n' % self.rdlen)
f.write('%04x\n' % self.rdlen)
+
class RR:
'''This is a base class for various types of RR test data.
For each RR type (A, AAAA, NS, etc), we define a derived class of RR
f.write('\n# %s RDATA%s\n' % (type_txt, rdlen_spec))
f.write('%s\n' % rdlen_data)
+
class A(RR):
'''Implements rendering A RDATA (of class IN) in the test data format.
f.write('%02x%02x%02x%02x\n' % (bin_address[0], bin_address[1],
bin_address[2], bin_address[3]))
+
class AAAA(RR):
'''Implements rendering AAAA RDATA (of class IN) in the test data
format.
[f.write('%02x' % x) for x in bin_address]
f.write('\n')
+
class NS(RR):
'''Implements rendering NS RDATA in the test data format.
f.write('# NS name=%s\n' % (self.nsname))
f.write('%s\n' % nsname_wire)
+
class SOA(RR):
'''Implements rendering SOA RDATA in the test data format.
retry = 300
expire = 3600000
minimum = 1200
+
def dump(self, f):
mname_wire = encode_name(self.mname)
rname_wire = encode_name(self.rname)
self.retry, self.expire,
self.minimum))
+
class TXT(RR):
'''Implements rendering TXT RDATA in the test data format.
' ' if len(wirestring_list[i]) > 0 else '',
wirestring_list[i]))
+
class RP(RR):
'''Implements rendering RP RDATA in the test data format.
'''
mailbox = 'root.example.com'
text = 'rp-text.example.com'
+
def dump(self, f):
mailbox_wire = encode_name(self.mailbox)
text_wire = encode_name(self.text)
f.write('# MAILBOX=%s TEXT=%s\n' % (self.mailbox, self.text))
f.write('%s %s\n' % (mailbox_wire, text_wire))
+
class SSHFP(RR):
'''Implements rendering SSHFP RDATA in the test data format.
algorithm = 2
fingerprint_type = 1
fingerprint = '123456789abcdef67890123456789abcdef67890'
+
def dump(self, f):
if self.rdlen is None:
self.rdlen = 2 + (len(self.fingerprint) / 2)
self.fingerprint))
f.write('%02x %02x %s\n' % (self.algorithm, self.fingerprint_type, self.fingerprint))
+
class CAA(RR):
'''Implements rendering CAA RDATA in the test data format.
flags = 0
tag = 'issue'
value = 'ca.example.net'
+
def dump(self, f):
if self.rdlen is None:
self.rdlen = 1 + 1 + len(self.tag) + len(self.value)
else:
self.rdlen = int(self.rdlen)
self.dump_header(f, self.rdlen)
- f.write('# FLAGS=%d TAG=%s VALUE=%s\n' % \
- (self.flags, self.tag, self.value))
- f.write('%02x %02x ' % \
- (self.flags, len(self.tag)))
+ f.write('# FLAGS=%d TAG=%s VALUE=%s\n' % (self.flags, self.tag, self.value))
+ f.write('%02x %02x ' % (self.flags, len(self.tag)))
f.write(encode_string(self.tag))
f.write(encode_string(self.value))
f.write('\n')
+
class DNSKEY(RR):
'''Implements rendering DNSKEY RDATA in the test data format.
f.write('# DIGEST=%s\n' % (self.digest))
f.write('%s\n' % (encode_bytes(decoded_digest)))
+
class NSECBASE(RR):
'''Implements rendering NSEC/NSEC3 type bitmaps commonly used for
these RRs. The NSEC and NSEC3 classes will be inherited from this
'''
nbitmap = 1 # number of bitmaps
block = 0
- maplen = None # default bitmap length, auto-calculate
+ maplen = None # default bitmap length, auto-calculate
bitmap = '040000000003' # an arbitrarily chosen bitmap sample
+
def dump(self, f):
# first, construct the bitmap data
block_list = []
maplen_list.append(self.__dict__[key_maplen])
else:
maplen_list.append(self.maplen)
- if maplen_list[-1] is None: # calculate it if not specified
+ if maplen_list[-1] is None: # calculate it if not specified
maplen_list[-1] = int(len(bitmap_list[-1]) / 2)
key_block = 'block' + str(i)
if key_block in self.__dict__:
- block_list.append(self.__dict__[key_block])
+ block_list.append(self.__dict__[key_block])
else:
block_list.append(self.block)
# dump RR-type specific part (NSEC or NSEC3)
- self.dump_fixedpart(f, 2 * self.nbitmap + \
- int(len(''.join(bitmap_list)) / 2))
+ self.dump_fixedpart(f, 2 * self.nbitmap + int(len(''.join(bitmap_list)) / 2))
# dump the bitmap
for i in range(0, self.nbitmap):
f.write('%02x %02x %s\n' %
(block_list[i], maplen_list[i], bitmap_list[i]))
+
class NSEC(NSECBASE):
'''Implements rendering NSEC RDATA in the test data format.
'''
nextname = 'next.example.com'
+
def dump_fixedpart(self, f, bitmap_totallen):
name_wire = encode_name(self.nextname)
if self.rdlen is None:
int(len(name_wire) / 2)))
f.write('%s\n' % name_wire)
+
class NSEC3PARAM(RR):
'''Implements rendering NSEC3PARAM RDATA in the test data format.
' ' if len(self.salt) > 0 else '',
encode_string(self.salt)))
+
class NSEC3(NSECBASE, NSEC3PARAM):
'''Implements rendering NSEC3 RDATA in the test data format.
hashlen = 20
hash = 'h' * hashlen
+
def dump_fixedpart(self, f, bitmap_totallen):
if self.rdlen is None:
# if rdlen needs to be calculated, it must be based on the bitmap
# length, because the configured maplen can be fake.
- self.rdlen = 4 + 1 + len(self.salt) + 1 + len(self.hash) \
- + bitmap_totallen
+ self.rdlen = 4 + 1 + len(self.salt) + 1 + len(self.hash) + bitmap_totallen
self.dump_header(f, self.rdlen)
self._dump_params(f)
f.write("# Hash Len=%d, Hash='%s'\n" % (self.hashlen, self.hash))
' ' if len(self.hash) > 0 else '',
encode_string(self.hash)))
+
class RRSIG(RR):
'''Implements rendering RRSIG RDATA in the test data format.
f.write('# Tag=%d Signer=%s and Signature\n' % (self.tag, self.signer))
f.write('%04x %s %s\n' % (self.tag, name_wire, sig_wire))
+
class TKEY(RR):
'''Implements rendering TKEY RDATA in the test data format.
else:
other_data = encode_string(self.other_data, other_len)
if self.rdlen is None:
- self.rdlen = int(len(name_wire) / 2 + 16 + len(key) / 2 + \
- len(other_data) / 2)
+ self.rdlen = int(len(name_wire) / 2 + 16 + len(key) / 2 + len(other_data) / 2)
self.dump_header(f, self.rdlen)
f.write('# Algorithm=%s\n' % self.algorithm)
f.write('%s\n' % name_wire)
f.write('%04x%s\n' % (other_len,
' ' + other_data if len(other_data) > 0 else ''))
+
class TLSA(RR):
'''Implements rendering TLSA RDATA in the test data format.
selector = 0
matching_type = 1
certificate_association_data = 'd2abde240d7cd3ee6b4b28c54df034b97983a1d16e8a410e4561cb106618e971'
+
def dump(self, f):
if self.rdlen is None:
self.rdlen = 2 + (len(self.certificate_association_data) / 2)
else:
self.rdlen = int(self.rdlen)
self.dump_header(f, self.rdlen)
- f.write('# CERTIFICATE_USAGE=%d SELECTOR=%d MATCHING_TYPE=%d CERTIFICATE_ASSOCIATION_DATA=%s\n' %\
- (self.certificate_usage, self.selector, self.matching_type,\
+ f.write('# CERTIFICATE_USAGE=%d SELECTOR=%d MATCHING_TYPE=%d CERTIFICATE_ASSOCIATION_DATA=%s\n' %
+ (self.certificate_usage, self.selector, self.matching_type,
self.certificate_association_data))
- f.write('%02x %02x %02x %s\n' % (self.certificate_usage, self.selector, self.matching_type,\
+ f.write('%02x %02x %02x %s\n' % (self.certificate_usage, self.selector, self.matching_type,
self.certificate_association_data))
+
class TSIG(RR):
'''Implements rendering TSIG RDATA in the test data format.
error = 0
other_len = None # 6 if error is BADTIME; otherwise 0
other_data = None # use time_signed + fudge + 1 for BADTIME
- dict_macsize = { 'hmac-md5' : 16, 'hmac-sha1' : 20, 'hmac-sha256' : 32 }
+ dict_macsize = {'hmac-md5': 16, 'hmac-sha1': 20, 'hmac-sha256': 32}
# TSIG has some special defaults
def __init__(self):
mac_size = self.dict_macsize[self.algorithm]
else:
raise RuntimeError('TSIG Mac Size cannot be determined')
- mac = encode_string('x' * mac_size) if self.mac is None else \
- encode_string(self.mac, mac_size)
+ mac = encode_string('x' * mac_size) if self.mac is None else encode_string(self.mac, mac_size)
other_len = self.other_len
if other_len is None:
# 18 = BADTIME
other_len = 6 if self.error == 18 else 0
other_data = self.other_data
if other_data is None:
- other_data = '%012x' % (self.time_signed + self.fudge + 1) \
- if self.error == 18 else ''
+ other_data = '%012x' % (self.time_signed + self.fudge + 1) if self.error == 18 else ''
else:
other_data = encode_string(self.other_data, other_len)
if self.rdlen is None:
- self.rdlen = int(len(name_wire) / 2 + 16 + len(mac) / 2 + \
- len(other_data) / 2)
+ self.rdlen = int(len(name_wire) / 2 + 16 + len(mac) / 2 + len(other_data) / 2)
self.dump_header(f, self.rdlen)
f.write('# Algorithm=%s Time-Signed=%d Fudge=%d\n' %
(self.algorithm, self.time_signed, self.fudge))
f.write('# MAC Size=%d MAC=(see hex)\n' % mac_size)
f.write('%04x%s\n' % (mac_size, ' ' + mac if len(mac) > 0 else ''))
f.write('# Original-ID=%d Error=%d\n' % (self.original_id, self.error))
- f.write('%04x %04x\n' % (self.original_id, self.error))
+ f.write('%04x %04x\n' % (self.original_id, self.error))
f.write('# Other-Len=%d Other-Data=(see hex)\n' % other_len)
f.write('%04x%s\n' % (other_len,
' ' + other_data if len(other_data) > 0 else ''))
+
class MINFO(RR):
'''Implements rendering MINFO RDATA in the test data format.
'''
rmailbox = 'rmailbox.example.com'
emailbox = 'emailbox.example.com'
+
def dump(self, f):
rmailbox_wire = encode_name(self.rmailbox)
emailbox_wire = encode_name(self.emailbox)
f.write('# RMAILBOX=%s EMAILBOX=%s\n' % (self.rmailbox, self.emailbox))
f.write('%s %s\n' % (rmailbox_wire, emailbox_wire))
+
class AFSDB(RR):
'''Implements rendering AFSDB RDATA in the test data format.
'''
subtype = 1
server = 'afsdb.example.com'
+
def dump(self, f):
server_wire = encode_name(self.server)
if self.rdlen is None:
f.write('# SUBTYPE=%d SERVER=%s\n' % (self.subtype, self.server))
f.write('%04x %s\n' % (self.subtype, server_wire))
+
# Build section-class mapping
-config_param = { 'name' : (Name, {}),
- 'header' : (DNSHeader, header_xtables),
- 'question' : (DNSQuestion, question_xtables),
- 'edns' : (EDNS, {}) }
+config_param = {'name': (Name, {}),
+ 'header': (DNSHeader, header_xtables),
+ 'question': (DNSQuestion, question_xtables),
+ 'edns': (EDNS, {})}
for rrtype in dict_rrtype.keys():
# For any supported RR types add the tuple of (RR_CLASS, {}).
# We expect KeyError as not all the types are supported, and simply
except KeyError:
pass
+
def get_config_param(section):
s = section
- m = re.match('^([^:]+)/\d+$', section)
+ m = re.match(r'^([^:]+)/\d+$', section)
if m:
s = m.group(1)
return config_param[s]
-usage = '''usage: %prog [options] input_file'''
+
+usage = 'usage: %prog [options] input_file'
+
if __name__ == "__main__":
parser = OptionParser(usage=usage)
outputfile = options.output
if not outputfile:
- m = re.match('(.*)\.[^.]+$', configfile)
+ m = re.match(r'(.*)\.[^.]+$', configfile)
if m:
outputfile = m.group(1)
else:
if m is not None:
version = max(version, int(m[0]) if len(m) else 0)
if version == 0:
- print(f"ERROR: expected schema version upgrade statement of format \"SET version = '\\d+', minor = '\\d+';\" in file \"{file}\", but not found.", file=sys.stderr)
+ print("ERROR: expected schema version upgrade statement of format "
+ f"\"SET version = '\\d+', minor = '\\d+';\" in file \"{file}\", but not found.", file=sys.stderr)
sys.exit(2)
append = False
result = []
- first_delimiter = r'<<EOF$' if is_upgrade_script else fr'^-- This line starts the schema upgrade to version {version}'
+ first_delimiter = (r'<<EOF$' if is_upgrade_script else
+ fr'^-- This line starts the schema upgrade to version {version}')
second_delimiter = r'^EOF$' if is_upgrade_script else r' Notes:$'
first_delimiter_found = False
second_delimiter_found = False
# get a diff with changes that are on that branch only
# i.e. all unmerged code.
# Issue: [B603:subprocess_without_shell_equals_true] subprocess call - check for execution of untrusted input.
- cmd = ["git", "diff", "master..." + branch_info.name ]
+ cmd = ["git", "diff", "master..." + branch_info.name]
diff = check_output(cmd)
if len(diff) == 0:
# No diff? Then all changes from that branch are on master as well.
# let's get the last contributor with extra formatting
# see man git-log and search for PRETTY FORMATS.
# %ai = date, %ae = author e-mail, %an = author name
- cmd = [ "git" , "log", "-n", "1", "--pretty=\"%ai,%ae,%an\"",
- branch_info.name ]
- # Issue: [B603:subprocess_without_shell_equals_true] subprocess call - check for execution of untrusted input.
+ cmd = ["git", "log", "-n", "1", "--pretty=\"%ai,%ae,%an\"", branch_info.name]
+ # Issue: [B603:subprocess_without_shell_equals_true] subprocess call - check for execution of untrusted
+ # input.
offender = check_output(cmd)
offender = offender.strip(b"\n\"")
offender = offender.replace(b"@", b"(at)")
# Obfuscating a dot does not work too well for folks that use
# initials
- #offender = offender.replace(b".", b"(dot)")
+ # offender = offender.replace(b".", b"(dot)")
branch_info.last_commit = offender.decode("utf-8")
out.append(branch_info)
return out
+
def branch_print(branches, csv, print_merged, print_notmerged, print_stats):
""" prints out list of branches with specified details (using
human-readable (or CSV) format. It is possible to specify,
if not print_merged:
continue
if csv:
- print("%s,merged,%s" % (branch.name, branch.last_commit) )
+ print("%s,merged,%s" % (branch.name, branch.last_commit))
else:
merged_str = merged_str + " " + branch.name
else:
if not print_notmerged:
continue
if csv:
- print("%s,notmerged,%s" % (branch.name, branch.last_commit) )
+ print("%s,notmerged,%s" % (branch.name, branch.last_commit))
else:
notmerged_str = notmerged_str + " " + branch.name
return options
+
def main():
usage = """%prog
Lists all obsolete (fully merged into master) branches.
branch_print(branch_list, csv, merged, unmerged, stats)
+
if __name__ == '__main__':
- main()
+ main()
from sqlalchemy.sql import select
import sys
+
def convert_to_db(entity_name, make_singular=True):
sql_name = entity_name.replace('-', '_').lower()
if not make_singular:
return sql_name
+
class State:
- def __init__(self, config, database, path = None, token_name = None):
+ def __init__(self, config, database, path=None, token_name=None):
self.config = config
self.database = database
if path is not None:
if token_name is not None:
self._path += [token_name]
- def copy(self, token_name = None):
+ def copy(self, token_name=None):
return State(self.config, self.database, self._path.copy(), token_name)
def sql_prefix(self):
return self.config.get_mapped_table_name('{0}_{1}'.format(self.sql_prefix(), self.sql_parent_name()))
def sql_table_name(self):
- return self.config.get_mapped_table_name('{0}_{1}'.format(self.sql_prefix(), convert_to_db(self.get_current_name(), True)))
+ return self.config.get_mapped_table_name('{0}_{1}'.format(self.sql_prefix(),
+ convert_to_db(self.get_current_name(), True)))
def get_parent_name(self):
return self._path[-2]
def get_path_len(self):
return len(self._path)
+
class ConfigFile:
def __init__(self, filename):
self.filename = filename
ignored_parameters = self.config['ignored_parameters']
return name in ignored_parameters
+
class KeaDatabase:
def __init__(self):
engine = db.create_engine('mysql+mysqldb://root@localhost/keatest')
return result[0] > 0
def has_column(self, table_name, column_name):
- sql = db.text(
- "SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = :table_name AND COLUMN_NAME = :column_name"
- )
+ sql = db.text("SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS "
+ "WHERE TABLE_NAME = :table_name AND COLUMN_NAME = :column_name")
result = self.connection.execute(sql, {"table_name": table_name, "column_name": column_name}).fetchone()
return result[0] > 0
+
def traverse(state, parents, json_object):
if state.config.is_ignored_parameter(state.get_current_name()):
return
elif state.get_path_len() == 2 and not isinstance(json_object, list) and not isinstance(json_object, dict):
# Global primitive value, e.g. boolean. Kea has a dedicated table for them.
- comment = cprint(text='Set primitive value {0} in {1} table'.format(state.sql_current_name(), state.sql_global_table_name()), color='green')
+ comment = cprint(text='Set primitive value {0} in {1} table'.format(state.sql_current_name(),
+ state.sql_global_table_name()),
+ color='green')
else:
# Handle primitives at lower levels.
# If the primitive belongs to a hierarchy of two maps, the structure of
# the lower level map must be flattened and the respective parameters
# must be moved to the upper level map.
- comment = cprint(text='Create column {0} in the parent table'.format(state.sql_current_name()), color='red')
+ comment = cprint(text='Create column {0} in the parent table'.format(state.sql_current_name()),
+ color='red')
else:
# An exception is the parameter belonging to the top level maps, e.g.
# Dhcp4/map/primitive. This primitive goes to the dhcp4_global_parameter
# table.
- comment = cprint(text='Use global parameter {0}'.format(state.sql_current_global_name()), color='yellow')
+ comment = cprint(text='Use global parameter {0}'.format(state.sql_current_global_name()),
+ color='yellow')
elif isinstance(parents[-1], dict) and isinstance(parents[-2], list):
# A list of maps deserves its own table. For example: subnet4 or
# shared_networks, option_def etc.
if state.database.has_column(state.sql_parent_table_name(), state.sql_current_name()):
- comment = cprint(text='Column {0} in {1} table exists'.format(state.sql_current_name(), state.sql_parent_table_name()), color='green')
+ comment = cprint(text='Column {0} in {1} table exists'.format(state.sql_current_name(),
+ state.sql_parent_table_name()),
+ color='green')
else:
- comment = cprint(text='Create column {0} in {1} table'.format(state.sql_current_name(), state.sql_parent_table_name()), color='red')
+ comment = cprint(text='Create column {0} in {1} table'.format(state.sql_current_name(),
+ state.sql_parent_table_name()),
+ color='red')
elif isinstance(json_object, list):
if json_object and isinstance(json_object[0], dict):
else:
comment = cprint(text='Create table {0}'.format(state.sql_table_name()), color='red')
else:
- comment = cprint(text='Unable to determine children types because all-keys file contains no children for this object', color='red')
+ comment = cprint(text=('Unable to determine children types because all-keys file contains no children '
+ 'for this object'), color='red')
elif isinstance(json_object, dict):
if len(parents) > 1 and isinstance(parents[-2], dict):
if state.get_path_len() == 2:
- comment = cprint(text='Parameters belonging to this map should be in {0}'.format(state.sql_global_table_name()), color='yellow')
+ comment = cprint(text='Parameters belonging to this map should be in {0}'.format(
+ state.sql_global_table_name()), color='yellow')
# Format printing the current object depending on its type.
if isinstance(json_object, dict):
# along with a comment.
print('{0}/{1}: {2}'.format(state.get_path(), type(json_object).__name__, comment))
+
def main():
parser = argparse.ArgumentParser(description='Kea Developer Tools')
parser.add_argument('--all-keys-file', metavar='all_keys_file', required=True,
traverse(State(config, database), [], parsed)
+
if __name__ == '__main__':
main()
-# Copyright (C) 2011-2015 Internet Systems Consortium, Inc. ("ISC")
+# Copyright (C) 2011-2024 Internet Systems Consortium, Inc. ("ISC")
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
import sys
+
def remove_empty_leading_trailing(lines):
"""
Removes leading and trailing empty lines.
message_key = canonicalise_message_line(lines[0])
message_lines = [message_key]
- index = 1;
+ index = 1
while index < len(lines):
if lines[index].startswith("%"):
# Start of new message
count = count + 1
# ... and the entry itself.
- for l in dictionary[msgid]:
- print(l.strip())
+ for line in dictionary[msgid]:
+ print(line.strip())
def process_file(filename):