+++ /dev/null
-import os
-import sys
-import time
-from subprocess import Popen, PIPE, STDOUT
-import signal
-import socket
-import errno
-import shutil
-
-
-class LaunchError(Exception):
- """ Exception class to signal startup error"""
- pass
-
-class AdminError(Exception):
- """ Exception class to handle admin errors"""
- pass
-
-
-class Launcher:
-
- def __init__(self, path):
- self._buildDir = path
- self._confDir = '%s/tests/kdc_realm/input_conf' % self._buildDir
- confFile ='%s/test_setup.conf' % self._confDir
- confParams = self._testSetup(confFile)
- self._sandboxDir = '%s/%s' % (self._buildDir,confParams['sandboxDir'])
- self._sandboxTier1 = '%s/%s' % (self._sandboxDir, 'tier1')
- self._sandboxTier2 = '%s/%s' % (self._sandboxDir, 'tier2')
- self._configurations = self._readServerConfiguration('%s/%s' % (self._confDir,confParams['testKDCconf']))
- self._configurations_1 = self._readServerConfiguration('%s/%s' % (self._confDir,confParams['testKDCconf_1']))
- self._principals = self._readTestInputs('%s/%s' % (self._confDir,confParams['principals']))
- os.environ["LD_LIBRARY_PATH"] = '%s/lib' % self._buildDir
- self._pidRefKDC = 0
- self._pidMap = dict()
- self._initialized = False
- self._tier1Init = False
- self._tier2Init = False
- self._vars = {'srcdir': self._buildDir,
- 'tier1':self._sandboxTier1,
- 'tier2':self._sandboxTier2,
- 'localFQDN':socket.getfqdn()}
-
- def _launchKDC(self, tierId, args, env):
- """
- Launching KDC server
- """
- cmd = '%s/kdc/krb5kdc' % self._buildDir
- handle = Popen([cmd, args], env=env)
- time.sleep(1)
- # make sure that process is running
- rc = handle.poll()
- if rc is None:
- print 'KDC server has been launched: pid=%s, tier=%s' % (handle.pid, tierId)
- self._pidMap[handle.pid] = 1
- return handle.pid
- else:
- raise LaunchError, 'Failed to launch kdc server'
-
-
- def _prepSandbox(self):
- for tierId in range(1,3):
- tierdir = '%s/tier%i' % (self._sandboxDir, tierId)
- if os.path.exists(tierdir):
- shutil.rmtree(tierdir)
- os.makedirs(tierdir, 0777)
-
-
- def _kill(self, pid = None):
- """
- Kill specific process or group saved in pidMap
- """
- if pid is None:
- target = self._pidMap.keys()
- else:
- target = [pid]
- for p in target:
- if p in self._pidMap:
- del self._pidMap[p]
- try:
- os.kill(p, signal.SIGKILL)
- except OSError:
- pass
-
-
- def _createDB(self, env):
- """
- Creating DB
- """
- cmd = '%s/kadmin/dbutil/kdb5_util' % self._buildDir
- p = Popen([cmd, 'create', '-s'], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = p.communicate('a\na\n')
- if p.returncode != 0:
- err_msg = 'Failed to create DB: %s' % err
- raise LaunchError, err_msg
-
-
- def _launchClient(self, args, env, princType):
- """
- kinit & kvno
- """
- self._addPrinc(args, env)
- p = Popen(['kinit', args], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = p.communicate('a\n')
- if int(p.wait()) == 0:
- self._initialized = True
- else:
- err_msg = 'Failed to kinit client: %s' % err
- raise AdminError, err_msg
-
- # testHost', 'mybox.mit.edu is a srv defined in referral KDC. Get its kvno
- cmd = '%s/clients/kvno/kvno' % self._buildDir
- if princType == 0:
- handle = Popen([cmd, '-C', '-S', 'testHost', 'mybox.mit.edu'],
- env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- if princType == 1:
- handle = Popen([cmd, '-C', '-u', 'testHost/mybox.mit.edu'],
- env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = handle.communicate()
- handle.wait()
- print 'kvno return code: %s' % handle.returncode
-
- # Cleanup cached info
- p = Popen(['kdestroy'], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = p.communicate()
- if int(p.wait()) != 0:
- err_msg = 'Failed to kdestroy cashed tickets: %s' % err
- raise AdminError, err_msg
-
- return handle.returncode
-
-
- def _addPrinc(self, args, env):
- """
- Add Principal
- """
- msg = 'addprinc -pw a %s' % args
- p = Popen(['kadmin.local' ], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = p.communicate(msg)
- if int(p.wait()) != 0:
- err_msg = 'Failed to add principal %s' % err_msg
- raise AdminError, err_msg
-
-
- def _crossRealm(self, r_local, r_remote, env):
- """
- Croos-realm setup
- """
- msg = 'addprinc -pw a krbtgt/%s@%s' % (r_remote, r_local)
- p = Popen(['kadmin.local' ], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = p.communicate(msg)
- if int(p.wait()) != 0:
- err_msg = 'Failed to set cross-realm: %s' % err
- raise AdminError, err_msg
-
- msg = 'addprinc -pw a krbtgt/%s@%s' % (r_local, r_remote)
- p = Popen(['kadmin.local' ], env = env, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- (out, err) = p.communicate(msg)
- if int(p.wait()) != 0:
- err_msg = 'Failed to set cross-realm: %s' % err
- raise AdminError, err_msg
-
-
- def _launchRefKDC(self,test_env):
- """
- Launch referral KDC
- """
- test_env["KRB5_CONFIG"] = '%s/krb5.conf' % self._sandboxTier1
- test_env["KRB5_KDC_PROFILE"] = '%s/kdc.conf' % self._sandboxTier1
- server_args = '-n'
- if self._tier1Init == False:
- # Create adequate to the environment config files
- self._createFileFromTemplate('%s' % test_env["KRB5_CONFIG"],
- '%s/%s' % (self._confDir,'krb5_ref_template.conf'),
- self._vars)
- self._createFileFromTemplate('%s' % test_env["KRB5_KDC_PROFILE"],
- '%s/%s' % (self._confDir, 'kdc_ref_template.conf'),
- self._vars)
-
- # create DB for KDC to be referred to
- pid = self._createDB(test_env)
-
- # launch KDC to be referred to
- self._pidRefKDC = self._launchKDC(1, server_args, test_env)
-
- # The tests run against 'testHost/mybox.mit.edu' srv.
- args = 'testHost/mybox.mit.edu'
- self._addPrinc(args, test_env)
- self._crossRealm('Z.COM', 'Y.COM', test_env)
- self._tier1Init = True
-
-
- def _launchTestingPair(self, srvParam,clntParam, princType):
- # launch KDC
- server_env = os.environ.copy()
- server_env["KRB5_KDC_PROFILE"] = '%s/kdc.conf' % self._sandboxTier2
- server_env["KRB5_CONFIG"] = '%s/krb5_KDC.conf' % self._sandboxTier2
- server_args = '-n'
- self._createFileFromTemplate('%s' % server_env["KRB5_CONFIG"],
- '%s/%s' % (self._confDir,srvParam),
- self._vars)
- self._createFileFromTemplate('%s' % server_env["KRB5_KDC_PROFILE"],
- '%s/%s' % (self._confDir,'kdc_pri_template.conf'),
- self._vars)
- if self._tier2Init == False:
- pid = self._createDB(server_env)
- self._crossRealm('Y.COM', 'Z.COM', server_env)
- self._tier2Init = True
-
- server = self._launchKDC( 2, server_args, server_env)
-
- # launch client
- client_env = os.environ.copy()
- client_env["KRB5_CONFIG"] = '%s/krb5_CL.conf' % self._sandboxTier2
- self._createFileFromTemplate('%s' % client_env["KRB5_CONFIG"],
- '%s/%s' % (self._confDir, 'krb5_priCL_template.conf'),
- self._vars)
- client_env["KRB5_KDC_PROFILE"] = server_env["KRB5_KDC_PROFILE"]
- rc = self._launchClient(clntParam, client_env, princType)
- self._kill(server)
- return rc
-
-
- def run(self, args):
- """
- run the test
- """
- test_env = os.environ.copy()
- test_env["SRCDIR"] = '%s' % self._buildDir
-
- # create sandbox file directory if it does not exist
- self._prepSandbox()
-
- if self._tier1Init == False:
- self._launchRefKDC(test_env)
-
- result = dict()
- for princs in self._principals:
- for conf in self._configurations:
- rc = self._launchTestingPair( conf['confName'], princs % self._vars, 0)
- result[conf['confName']] = {'expected':conf['expected'], 'actual':rc}
- print 'Test code for configuration %s principal %s type KRB5_NT_SRV_HST: %s' % (conf, princs, rc)
- self.printTestResults(result)
- for conf in self._configurations_1:
- rc = self._launchTestingPair( conf['confName'], princs % self._vars, 1)
- result[conf['confName']] = {'expected':conf['expected'], 'actual':rc}
- print 'Test code for configuration %s principal %si type KRB5_NT_UNKNOWN: %s' % (conf, princs, rc)
- self.printTestResults(result)
- return result
-
-
- def _readTestInputs(self, path):
- f = open(path, 'r')
- result = []
- for line in f:
- result.append(line.rstrip())
- f.close()
- return result
-
-
- def _readServerConfiguration(self, path):
- f = open(path, 'r')
- result = []
- for line in f:
- fields = (line.rstrip()).split(',')
- result.append({'confName':fields[0],'expected':fields[1]})
- f.close()
- return result
-
-
- def _testSetup(self, path):
- print path
- f = open(path, 'r')
- result = dict()
- for line in f:
- try:
- (a,v) = line.rstrip().split('=')
- result[a]=v
- except:
- print 'bad format for config file, line: %s' % line
- return None
- f.close()
- return result
-
-
- def _createFileFromTemplate(self, outpath, template, vars):
- fin = open(template, 'r')
- result = fin.read() % vars
- fin.close()
- fout = open(outpath, 'w')
- fout.write(result)
- fout.close()
-
-
- def _getDNS(self):
- print socket.getfqdn()
-
-
- def printTestResults(self, testResults):
- success_count = 0
- fail_count = 0
- print '\n'
- print '------------------- Test Results ------------------------'
- for (conf_name, result) in testResults.iteritems():
- if int(result['expected']) == int(result['actual']):
- print 'Test for configuration %s has succeeded' % conf_name
- success_count += 1
- else:
- print 'Test for configuration %s has failed' % conf_name
- fail_count += 1
-
- print '------------------- Summary -----------------------------'
- print 'Of %i tests %i failed, %i succeeded' % (len(testResults),
- fail_count,
- success_count)
- print '---------------------------------------------------------'
-
-
- def clean(self):
- self._kill()
-
-
-if __name__ == '__main__':
- src_path = "%s/.." % os.environ["PWD"]
- print "SOURCE PATH ==>" , src_path
- test = None
- try:
- test = Launcher(src_path)
- result = test.run('main')
- test.clean()
-
- except:
- if test is not None:
- test.clean()
- raise