import base64
import codecs
import random
+import tempfile
+
+if sys.version_info[0] < 3:
+ # python 2.7
+ import ConfigParser as configparser
+else:
+ # python >= 3.X
+ import configparser
# logging.raiseExceptions=False
LOG_FILE="@working_dir@/key-manager.log"
KEY_DIR="@sysconfdir@/keydir"
+CONFIG_FILE="@sysconfdir@/key-manager.conf"
+
+# trick to use the .in as a python script
+if LOG_FILE.startswith('@'):
+ LOG_FILE=os.path.join(tempfile.gettempdir(), 'key-manager.log')
+if KEY_DIR.startswith('@'):
+ KEY_DIR=os.path.join(tempfile.gettempdir(), 'keydir')
+if CONFIG_FILE.startswith('@'):
+ CONFIG_FILE=os.path.join(tempfile.gettempdir(), 'key-manager.conf')
MASTER_KEYID_SIZE=20
want_to_have_all_the_same_keys=False
MAX_NAME_LENGTH=128
volume_re=re.compile('[A-Za-z0-9:.-_]{1,128}')
+# the config from the configuration file if any
+config=None
+
+class CryptoCtx:
+ master_key_id=None
+ cipher=DEFAULT_CIPHER
+ stealth=False
+ passphrase=None
+
# raiseExceptions=False
class MyFileHandler(logging.FileHandler):
"""raise an exception when the format don't match the parameters
def check_force_cipher_env(cipher):
return os.getenv('FORCE_CIPHER', cipher)
-def generate_key(cipher, master_key):
- if cipher=='AES_128_XTS':
+def get_crypto_ctx_from_config(args, volume_name, master_keyid=None):
+ """ retrieve the master-key defined in the config file
+ return
+ None : for error
+ MasterKey object : the master-key
+ """
+ crypto_ctx=CryptoCtx()
+ crypto_ctx.cipher=args.cipher
+ if args.config:
+ try:
+ config=configparser.ConfigParser()
+ config.read(args.config)
+ except configparser.ParsingError as exc:
+ logging.error("parsing configuration file \"%s\": %s", args.config, str(exc))
+ print('error: parsing configuration file \"{}\"\n'.format(args.config))
+ return None
+ the_section=None
+ if master_keyid:
+ if config.has_section(master_keyid):
+ the_section=master_keyid
+ else:
+ logging.error("configuration file \"%s\" has no master-key \"%s\"", args.config, master_keyid)
+ print('error: configuration file \"{}\" has no master-key \"{}\"\n'.format(args.config, master_keyid))
+ return None
+ else:
+ # search for the section matching the volume
+ for section in config.sections():
+ try:
+ volume_regex=config.get(section, 'volume_regex')
+ except configparser.NoOptionError:
+ logging.debug("ignore section \"%s\"", section)
+ continue
+ try:
+ match=re.match(volume_regex, volume_name)
+ except re.error:
+ logging.error("regular expression error in configuration file \"%s\" in section \"%s\" : %s", args.config, section, str(exc))
+ print("error: regular expression error in configuration file \"{}\" in section \"{}\" : {}".format(args.config, section, str(exc)))
+ return None
+ if match:
+ the_section=section
+ break
+ if not the_section:
+ logging.error("no master-key defined for volume \"%s\"", volume_name)
+ print("error: no master-key define for volume \"{}\"".format(volume_name))
+ return None
+ crypto_ctx=CryptoCtx()
+ crypto_ctx.master_key_id=the_section
+ try:
+ crypto_ctx.cipher=config.get(the_section, 'cipher')
+ except configparser.NoOptionError:
+ crypto_ctx.cipher=args.cipher
+ try:
+ crypto_ctx.stealth=config.getboolean(the_section, 'stealth')
+ except configparser.NoOptionError:
+ pass
+ try:
+ crypto_ctx.passphrase=config.get(the_section, 'passphrase')
+ except configparser.NoOptionError:
+ pass
+ logging.info("use masterkey %r and cipher \"%s\" for volume \"%s\"", crypto_ctx.master_key_id, crypto_ctx.cipher, volume_name)
+
+ return crypto_ctx
+
+def generate_key(crypto_ctx, volume_name):
+ if crypto_ctx.cipher=='AES_128_XTS':
key_size=32
- elif cipher=='AES_256_XTS':
+ elif crypto_ctx.cipher=='AES_256_XTS':
key_size=64
- elif cipher=='NULL':
+ elif crypto_ctx.cipher=='NULL':
key_size=16
else:
- logging.error('unknown cipher %s', cipher)
- return None # unknown key
+ logging.error('unknown cipher %s', crypto_ctx.cipher)
+ return None # unknown cipher
urandom=open('/dev/urandom', 'rb')
key=urandom.read(key_size)
if want_to_have_all_the_same_keys:
key=b'A'*key_size
key_base64=codecs.decode(base64.b64encode(key))
r=dict()
- r['cipher']=cipher
+ r['cipher']=crypto_ctx.cipher
r['cipher_key']=key_base64
-
- if master_key:
- master_keyid=urandom.read(MASTER_KEYID_SIZE)
- if want_to_have_all_the_same_keys:
- key=b'B'*MASTER_KEYID_SIZE
- master_keyid_base64=codecs.decode(base64.b64encode(master_keyid))
+ r['volume_name']=volume_name
+ if crypto_ctx.master_key_id:
+ try:
+ import gnupg
+ except ImportError:
+ logging.error('module gnupg is not installed')
+ print('python module gnupg is not installed')
+ return None
+ gpg=gnupg.GPG()
+ master_keyid_base64=codecs.decode(base64.b64encode(codecs.encode(crypto_ctx.master_key_id)))
r['master_keyid']=master_keyid_base64
- enc_key=bytes_xor(key, master_keyid)
- enc_key_base64=codecs.decode(base64.b64encode(enc_key))
+ enc_key=gpg.encrypt(key, crypto_ctx.master_key_id, armor=False)
+ enc_key_base64=codecs.decode(base64.b64encode(enc_key.data))
r['enc_cipher_key']=enc_key_base64
+ return r
+def decrypt_key(crypto_ctx, volume_name, enc_cipher_key):
+ try:
+ import gnupg
+ except ImportError:
+ logging.error('module gnupg is not installed')
+ print('python module gnupg is not installed')
+ return None
+ r=dict()
+ r['cipher']=crypto_ctx.cipher
+ gpg=gnupg.GPG()
+ master_keyid_base64=codecs.decode(base64.b64encode(codecs.encode(crypto_ctx.master_key_id)))
+ r['master_keyid']=master_keyid_base64
+ passphrase=crypto_ctx.passphrase
+ cipher_key=gpg.decrypt(enc_cipher_key, passphrase=passphrase)
+ if cipher_key.ok==False:
+ logging.error('decryption error for volume "{}":'.format(volume_name, cipher_key.status))
+ print('decryption error for volume "{}":'.format(volume_name, cipher_key.status))
+ return None
+ cipher_key_base64=codecs.decode(base64.b64encode(cipher_key.data))
+ r['cipher_key']=cipher_key_base64
+ r['volume_name']=volume_name
return r
def decode_data(data):
if not volume_name:
logging.error("environment variable VOLUME_NAME missing or empty")
print('error: environment variable VOLUME_NAME missing or empty\n')
- return 1
+ return 0
if not operation:
logging.error("environment variable OPERATION missing or empty")
print('error: environment variable OPERATION missing or empty\n')
- return 1
+ return 0
if not operation in [ 'LABEL', 'READ']:
logging.error("environment variable OPERATION invalid \"%s\" for volume \"%s\"", operation, volume_name)
print("error: environment variable OPERATION invalid \"{}\" for volume \"{}\"\n".format(operation, volume_name))
- return 1
+ return 0
enc_cipher_key=os.getenv('ENC_CIPHER_KEY')
master_keyid=os.getenv('MASTER_KEYID')
logging.info('getkey op=%s volume=%s enckey=%s masterkey=%s', operation, volume_name, enc_cipher_key if enc_cipher_key else "<NONE>", master_keyid if master_keyid else "<NONE>")
key_filename=os.path.join(args.key_dir, escape_volume_name(volume_name))
if operation=='LABEL':
+ crypto_ctx=get_crypto_ctx_from_config(args, volume_name)
+ if crypto_ctx==None:
+ return 0 # error reading the config file
+ crypto_ctx.cipher=check_force_cipher_env(crypto_ctx.cipher)
if os.path.isfile(key_filename):
logging.info("delete old keyfile for volume \"%s\" : %s", volume_name, key_filename)
os.unlink(key_filename)
- cipher=check_force_cipher_env(args.cipher)
- ctx=generate_key(cipher, args.master_key)
+ ctx=generate_key(crypto_ctx, volume_name)
if ctx==None:
- return 1
- ctx['volume_name']=volume_name
+ return 0 # error while generating the key (wrong cipher or gnupg not installed)
logging.info("generate key volume=%s cipher=%s enckey=%s masterkey=%s", ctx['volume_name'], ctx['cipher'], ctx.get('enc_cipher_key', ''), ctx.get('master_keyid', ''))
- if args.master_key:
+ if crypto_ctx.stealth:
# don't keep an un-encrypted version of the cipher_key
# use the masterkey id to decrypte the enckey
exclude=set(['cipher_key'])
f.close()
output=encode_data(ctx) # including the 'cipher_key'
elif operation=='READ':
- if not os.path.isfile(key_filename):
- logging.error("error: keyfile \"%s\" not found for volume \"%s\"", key_filename, volume_name)
- print('error: no key information for volume "{}"'.format(volume_name))
+ ctx=dict()
+ if os.path.isfile(key_filename):
+ # use data in the key file
+ data=open(key_filename, 'rt').read()
+ ctx=decode_data(data)
+ if 'cipher_key' in ctx:
+ logging.info("read key volume=%s cipher=%s", ctx['volume_name'], ctx['cipher'])
+ output=encode_data(ctx)
+ elif not enc_cipher_key:
+ logging.error("no cipher key nor encrypted cipher key for volume \"%s\"", volume_name)
+ print('error: no cipher key nor encrypted cipher key for volume "{}"'.format(volume_name))
return 0
- data=open(key_filename, 'rt').read()
- ctx=decode_data(data)
- if args.master_key:
- if not enc_cipher_key:
- logging.warning("environment variable ENC_CIPHER_KEY missing or empty")
- return 'error: environment variable ENC_CIPHER_KEY missing or empty\n'
- if not master_keyid:
- logging.warning("environment variable MASTER_KEYID missing or empty")
- return 'error: environment variable MASTER_KEYID missing or empty\n'
- if enc_cipher_key!=ctx['enc_cipher_key']:
- logging.error("encoded cipher key for %s don't match, volume: %s, cache:%s", volume_name, enc_cipher_key, ctx['enc_cipher_key'])
- if master_keyid!=ctx['master_keyid']:
- logging.error("master_keyid for %s don't match, volume: %s, cache:%s", volume_name, enc_cipher_key, ctx['master_keyid'])
+ else:
enc_cipher_key_raw=base64.b64decode(codecs.encode(enc_cipher_key))
master_keyid_raw=base64.b64decode(codecs.encode(master_keyid))
- cipher_key_raw=bytes_xor(enc_cipher_key_raw, master_keyid_raw)
- ctx['cipher_key']=codecs.decode(base64.b64encode(cipher_key_raw))
- logging.info("read key volume=%s cipher=%s enc_key=%s masterkey=%s", ctx['volume_name'], ctx['cipher'], ctx['enc_cipher_key'], ctx['master_keyid'])
- output=encode_data(ctx)
- else:
- logging.info("read key volume=%s cipher=%s", ctx['volume_name'], ctx['cipher'])
+ master_keyid_ascii=codecs.decode(master_keyid_raw)
+ # maybe we can retrieve the passphrase for the master-key
+ crypto_ctx=get_crypto_ctx_from_config(args, volume_name, master_keyid_ascii)
+ if crypto_ctx==None:
+ return 0 # error no master-key
+ # maybe the master_keyid from the volume could have done the job
+ # if gnupg still remember this master-key despit it has been
+ # removed from the key-manager config file
+ crypto_ctx.cipher=check_force_cipher_env(crypto_ctx.cipher)
+ # use the master-key to decrypt the enc_cipher_key
+ ctx=decrypt_key(crypto_ctx, volume_name, enc_cipher_key_raw)
+ if ctx==None:
+ return 0 # error decrypting key
+ logging.info("read key volume=%s cipher=%s cipher_key=%s masterkey=%s", ctx['volume_name'], ctx['cipher'], ctx['cipher_key'], ctx['master_keyid'])
output=encode_data(ctx)
else:
output='error: unknown operation \"%r\"'.format(operation)
common_parser=argparse.ArgumentParser(add_help=False)
common_parser.add_argument('--key-dir', '-k', metavar='DIRECTORY', type=str, default=KEY_DIR, help='the directory where to store the keys')
+common_parser.add_argument('--config', '-C', metavar='CONFIG', type=str, help='the configuration file')
common_parser.add_argument('--log', metavar='LOGFILE', type=str, default=LOG_FILE, help='setup the logfile')
-common_parser.add_argument('--debug', '-d', action='store_true', help='enable debuging')
+common_parser.add_argument('--debug', '-d', action='store_true', help='enable debugging')
common_parser.add_argument('--verbose', '-v', action='store_true', help='be verbose')
parser=subparsers.add_parser('getkey', description="Retrieve a key", parents=[common_parser, ],
help="retrieve a key or generate one if don't exist yet")
-parser.add_argument('--master-key', '-m', action='store_true', help='generate masterkey-id and encoded key for testing (not supported in 16.0)')
-parser.add_argument('--cipher', '-c', metavar='KEY-TYPE', choices=CIPHERS, default=DEFAULT_CIPHER, help='the cipher in {}'.format(', '.join(CIPHERS)))
+parser.add_argument('--cipher', '-c', metavar='CIPHER', choices=CIPHERS, default=DEFAULT_CIPHER, help='set the default cipher in {}'.format(', '.join(CIPHERS)))
parser.set_defaults(func=getkey)
parser=subparsers.add_parser('test', description="Run some internal test of the code")
args._parser=mainparser
setup_logging(getattr(args, 'debug', None), getattr(args, 'verbose', None), getattr(args, 'log', None))
+logging.error('OPERATION=%s VOLUME=%s', os.getenv("OPERATION"), os.getenv("VOLUME_NAME"))
+# check for the key_dir directory
if hasattr(args, 'key_dir'):
if not os.path.exists(args.key_dir):
try:
logging.error('The "key" directory is not accessible for READ and WRITE: %s', args.key_dir)
mainparser.error('error: need read and write access to "{}"'.format(args.key_dir))
-sys.exit(args.func(args))
-
+# check for the config file
+if hasattr(args, 'config'):
+ if args.config==None and os.path.exists(CONFIG_FILE):
+ args.config=CONFIG_FILE # the default file exists, use it
+ logging.debug('Use config file %s', args.config)
+ if args.config!=None and (not os.path.exists(args.config) or not os.access(args.config, os.R_OK)):
+ logging.error('The config file don\'t exists or cannot be read: %s', args.config)
+ mainparser.error('The config file don\'t exists or cannot be read: %s'.format(args.config))
+sys.exit(args.func(args))