result = cmd._run("samba-tool %s %s" % (name, sub), *args)
return (result, cmd.outf.getvalue(), cmd.errf.getvalue())
- def assertCmdSuccess(self, val, msg=""):
- self.assertIsNone(val, msg)
+ def assertCmdSuccess(self, exit, out, err, msg=""):
+ self.assertIsNone(exit, msg="exit[%s] stdout[%s] stderr[%s]: %s" % (
+ exit, out, err, msg))
def assertCmdFail(self, val, msg=""):
self.assertIsNotNone(val, msg)
"""Run fsmo show to see if it errors"""
(result, out, err) = self.runsubcmd("fsmo", "show")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
# Check that the output is sensible
def test_gpo_list(self):
"""Run gpo list against the server and make sure it looks accurate"""
(result, out, err) = self.runsubcmd("gpo", "listall", "-H", "ldap://%s" % os.environ["SERVER"])
- self.assertCmdSuccess(result, "Ensuring gpo listall ran successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo listall ran successfully")
def test_fetchfail(self):
"""Run against a non-existent GPO, and make sure it fails (this hard-coded UUID is very unlikely to exist"""
def test_fetch(self):
"""Run against a real GPO, and make sure it passes"""
(result, out, err) = self.runsubcmd("gpo", "fetch", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "--tmpdir", self.tempdir)
- self.assertCmdSuccess(result, "Ensuring gpo fetched successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
shutil.rmtree(os.path.join(self.tempdir, "policy"))
def test_show(self):
"""Show a real GPO, and make sure it passes"""
(result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"])
- self.assertCmdSuccess(result, "Ensuring gpo fetched successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
def test_show_as_admin(self):
"""Show a real GPO, and make sure it passes"""
(result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
- self.assertCmdSuccess(result, "Ensuring gpo fetched successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo fetched successfully")
def test_aclcheck(self):
"""Check all the GPOs on the remote server have correct ACLs"""
(result, out, err) = self.runsubcmd("gpo", "aclcheck", "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
- self.assertCmdSuccess(result, "Ensuring gpo checked successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo checked successfully")
def setUp(self):
"""set up a temporary GPO to work with"""
"-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]),
"--tmpdir", self.tempdir)
shutil.rmtree(os.path.join(self.tempdir, "policy"))
- self.assertCmdSuccess(result, "Ensuring gpo created successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo created successfully")
try:
self.gpo_guid = "{%s}" % out.split("{")[1].split("}")[0]
except IndexError:
def tearDown(self):
"""remove the temporary GPO to work with"""
(result, out, err) = self.runsubcmd("gpo", "del", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
- self.assertCmdSuccess(result, "Ensuring gpo deleted successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring gpo deleted successfully")
super(GpoCmdTestCase, self).tearDown()
for group in self.groups:
(result, out, err) = self._create_group(group)
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "There shouldn't be any error message")
self.assertIn("Added group %s" % group["name"], out)
# try to delete all the groups we just added
for group in self.groups:
(result, out, err) = self.runsubcmd("group", "delete", group["name"])
- self.assertCmdSuccess(result,
+ self.assertCmdSuccess(result, out, err,
"Failed to delete group '%s'" % group["name"])
found = self._find_group(group["name"])
self.assertIsNone(found,
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","There shouldn't be any error message")
self.assertIn("Added group %s" % group["name"], out)
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Error running list")
+ self.assertCmdSuccess(result, out, err, "Error running list")
search_filter = "(objectClass=group)"
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Error running listmembers")
+ self.assertCmdSuccess(result, out, err, "Error running listmembers")
search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn()
def test_ntvfs(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-s3fs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
def test_ntvfs_check(self):
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-ntvfs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
(result, out, err) = self.runsubcmd("ntacl", "sysvolreset",
"--use-s3fs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "sysvolcheck")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(out,"","Shouldn't be any output messages")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-ntvfs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err)
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "get", tempf,
"--use-ntvfs", "--as-sddl")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(self.acl+"\n", out, "Output should be the ACL")
(result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf,
"--use-s3fs")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(out,"","Shouldn't be any output messages")
self.assertEquals(err,"","Shouldn't be any error messages")
# Now check they were set correctly
(result, out, err) = self.runsubcmd("ntacl", "get", tempf,
"--use-s3fs", "--as-sddl")
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertEquals(self.acl+"\n", out,"Output should be the ACL")
def test_name(self):
"""Run processes command"""
(result, out, err) = self.runcmd("processes", "--name", "samba")
- self.assertCmdSuccess(result, "Ensuring processes ran successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring processes ran successfully")
def test_all(self):
"""Run processes command"""
(result, out, err) = self.runcmd("processes")
- self.assertCmdSuccess(result, "Ensuring processes ran successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring processes ran successfully")
def test_single_by_account_name(self):
(result, out, err) = self.runsubcmd("rodc", "preload", "sambatool1",
"--server", os.environ["DC_SERVER"])
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool1,CN=Users,%s\n" % self.base_dn)
self.assertEqual(err, "")
def test_single_by_dn(self):
(result, out, err) = self.runsubcmd("rodc", "preload", "cn=sambatool2,cn=users,%s" % self.base_dn,
"--server", os.environ["DC_SERVER"])
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool2,CN=Users,%s\n" % self.base_dn)
def test_multi_by_account_name(self):
(result, out, err) = self.runsubcmd("rodc", "preload", "sambatool1", "sambatool2",
"--server", os.environ["DC_SERVER"])
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool1,CN=Users,%s\nReplicating DN CN=sambatool2,CN=Users,%s\n" % (self.base_dn, self.base_dn))
def test_multi_by_dn(self):
(result, out, err) = self.runsubcmd("rodc", "preload", "cn=sambatool3,cn=users,%s" % self.base_dn, "cn=sambatool4,cn=users,%s" % self.base_dn,
"--server", os.environ["DC_SERVER"])
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool3,CN=Users,%s\nReplicating DN CN=sambatool4,CN=Users,%s\n" % (self.base_dn, self.base_dn))
def test_multi_in_file(self):
open(tempf, 'w').write("sambatool1\nsambatool2")
(result, out, err) = self.runsubcmd("rodc", "preload", "--file", tempf,
"--server", os.environ["DC_SERVER"])
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool1,CN=Users,%s\nReplicating DN CN=sambatool2,CN=Users,%s\n" % (self.base_dn, self.base_dn))
os.unlink(tempf)
"nonexistentuser2",
"--server", os.environ["DC_SERVER"],
"--ignore-errors")
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool5,CN=Users,%s\n" % self.base_dn)
def test_multi_with_missing_name_failure(self):
"sambatool6", "sambatool5",
"--server", os.environ["DC_SERVER"],
"--ignore-errors")
- self.assertCmdSuccess(result, "ensuring rodc prefetch ran successfully")
+ self.assertCmdSuccess(result, out, err, "ensuring rodc prefetch ran successfully")
self.assertEqual(out, "Replicating DN CN=sambatool6,CN=Users,%s\nReplicating DN CN=sambatool5,CN=Users,%s\n" % (self.base_dn, self.base_dn))
def test_multi_without_group_failure(self):
result, out, err = self.runsubcmd("sites", "create", sitename,
"-H", self.dburl, self.creds_string)
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
dnsites = ldb.Dn(self.samdb, "CN=Sites,%s" % self.config_dn)
dnsite = ldb.Dn(self.samdb, "CN=%s,%s" % (sitename, dnsites))
cidr, sitename,
"-H", self.dburl,
self.creds_string)
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
ret = self.samdb.search(base=self.config_dn,
scope=ldb.SCOPE_SUBTREE,
def test_timeget(self):
"""Run time against the server and make sure it looks accurate"""
(result, out, err) = self.runcmd("time", os.environ["SERVER"])
- self.assertCmdSuccess(result, "Ensuring time ran successfully")
+ self.assertCmdSuccess(result, out, err, "Ensuring time ran successfully")
timefmt = strptime(out, "%a %b %d %H:%M:%S %Y %Z\n")
servertime = int(mktime(timefmt))
for user in self.users:
(result, out, err) = user["createUserFn"](user)
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
# try to delete all the 4 users we just added
for user in self.users:
(result, out, err) = self.runsubcmd("user", "delete", user["name"])
- self.assertCmdSuccess(result, "Can we delete users")
+ self.assertCmdSuccess(result, out, err, "Can we delete users")
found = self._find_user(user["name"])
self.assertIsNone(found)
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
"--newpassword=%s" % newpasswd,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- # self.assertCmdSuccess(result, "Ensure setpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEquals(err,"","setpassword with url")
self.assertMatch(out, "Changed password OK", "setpassword with url")
"--cache-ldb-initialize",
"--attributes=%s" % attributes,
"--decrypt-samba-gpg")
- self.assertCmdSuccess(result, "Ensure syncpasswords --cache-ldb-initialize runs")
+ self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
self.assertEqual(err,"","getpassword without url")
cache_attrs = {
"objectClass": { "value": "userSyncPasswords" },
"syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
- self.assertCmdSuccess(result, "Ensure syncpasswords --no-wait runs")
+ self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
self.assertEqual(err,"","syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
"syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
(result, out, err) = self.runsubcmd("user", "setpassword",
user["name"],
"--newpassword=%s" % newpasswd)
- self.assertCmdSuccess(result, "Ensure setpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEquals(err,"","setpassword without url")
self.assertMatch(out, "Changed password OK", "setpassword without url")
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
- self.assertCmdSuccess(result, "Ensure syncpasswords --no-wait runs")
+ self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
self.assertEqual(err,"","syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
"syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
user["name"],
"--attributes=%s" % attributes,
"--decrypt-samba-gpg")
- self.assertCmdSuccess(result, "Ensure getpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
self.assertEqual(err,"","getpassword without url")
self.assertMatch(out, "Got password OK", "getpassword without url")
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
"--must-change-at-next-login",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- # self.assertCmdSuccess(result, "Ensure setpassword runs")
+ self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEquals(err,"","setpassword with forced change")
self.assertMatch(out, "Changed password OK", "setpassword with forced change")
"--days=2",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Can we run setexpiry with names")
+ self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
for user in self.users:
"--days=4",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Can we run setexpiry with a filter")
+ self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
for user in self.users:
found = self._find_user(user["name"])
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Error running list")
+ self.assertCmdSuccess(result, out, err, "Error running list")
search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
(ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- msg = "command should return %s" % err
- self.assertCmdSuccess(result, msg)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- msg = "command should return %s" % err
- self.assertCmdSuccess(result, msg)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
self.assertIn("User '%s' created successfully" % user["name"], out)
(result, out, err) = self.runsubcmd("user", "delete", user["name"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Should delete user with bad password.")
+ self.assertCmdSuccess(result, out, err, "Should delete user with bad password.")
(result, out, err) = self.runsubcmd("user", "add", user["name"], good_password,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Should succeed adding a user with good password.")
+ self.assertCmdSuccess(result, out, err, "Should succeed adding a user with good password.")
# Set password
(result, out, err) = self.runsubcmd("user", "setpassword", user["name"],
"--newpassword=%s" % good_password,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
- self.assertCmdSuccess(result, "Should succeed setting a user's password to a good one.")
+ self.assertCmdSuccess(result, out, err, "Should succeed setting a user's password to a good one.")
# Password=
"--newpassword=%s" % good_password + 'XYZ',
"--ipaddress", os.environ["DC_SERVER_IP"],
"-U%s%%%s" % (user["name"], good_password))
- self.assertCmdSuccess(result, "A user setting their own password to a good one should succeed.")
+ self.assertCmdSuccess(result, out, err, "A user setting their own password to a good one should succeed.")
def _randomUser(self, base={}):
"""create a user with random attribute values, you can specify base attributes"""
"-H", "ldap://%s:389" % DC,
cmd_line_auth)
- self.assertCmdSuccess(result)
+ self.assertCmdSuccess(result, out, err)
self.assertEquals(err,"","Shouldn't be any error messages")
if noop == False:
self.assertTrue("FSMO transfer of '%s' role successful" % role in out)