from __future__ import annotations
import shlex
-from typing import Dict
+from typing import Dict, Tuple
from pytest_mh.conn import ProcessLogLevel, ProcessResult
from ..hosts.shadow import ShadowHost
+from ..misc.errors import ExpectScriptError
from .base import BaseLinuxRole
__all__ = [
"Shadow",
]
+DEFAULT_INTERACTIVE_TIMEOUT: int = 60
+"""Default timeout for interactive sessions."""
+
class Shadow(BaseLinuxRole[ShadowHost]):
"""
self.host.discard_file("/etc/gshadow")
return cmd
+
+ def newgrp(self, *args, run_as: str = "root") -> Tuple[ProcessResult, int]:
+ """
+ Log in to a new group.
+
+ The newgrp command is used to change the current group ID during a login session.
+ Returns the process result and the group ID after the change.
+ """
+ args_dict = self._parse_args(args)
+
+ self.logger.info(f'Changing {run_as} to group "{args_dict["name"]}" on {self.host.hostname}')
+
+ # Use expect to handle the interactive newgrp session
+ result = self.host.conn.expect(
+ rf"""
+ set timeout {DEFAULT_INTERACTIVE_TIMEOUT}
+ set prompt "\[#\$>\] $"
+
+ if {{ "{run_as}" eq "root" }} {{
+ spawn newgrp {args[0]}
+ }} else {{
+ spawn su - {run_as}
+ expect {{
+ -re $prompt {{send "newgrp {args[0]}\n"}}
+ timeout {{puts "expect result: Timeout waiting for su prompt"; exit 201}}
+ eof {{puts "expect result: Unexpected end of file"; exit 202}}
+ }}
+ }}
+
+ expect {{
+ -re $prompt {{send "id -g\n"}}
+ timeout {{puts "expect result: Timeout waiting for newgrp prompt"; exit 201}}
+ eof {{puts "expect result: Unexpected end of file"; exit 202}}
+ }}
+
+ expect {{
+ -re "(\[0-9\]+)" {{
+ set gid $expect_out(1,string)
+ send "exit\n"
+ puts "newgrp_gid:$gid"
+ }}
+ timeout {{puts "expect result: Timeout waiting for id output"; exit 201}}
+ eof {{puts "expect result: Unexpected end of file"; exit 202}}
+ }}
+
+ if {{ "{run_as}" ne "root" }} {{
+ expect {{
+ -re $prompt {{
+ send "exit\n"
+ }}
+ timeout {{puts "expect result: Timeout waiting for original su prompt"; exit 201}}
+ }}
+ }}
+
+ expect {{
+ eof {{exit 0}}
+ timeout {{exit 201}}
+ }}
+ """,
+ verbose=False,
+ )
+
+ if result.rc > 200:
+ raise ExpectScriptError(result.rc)
+
+ gid_line = None
+ for line in result.stdout_lines:
+ if "newgrp_gid:" in line:
+ gid_line = line
+ break
+
+ if gid_line is None:
+ raise ValueError("Current GID is required for newgrp")
+
+ current_gid = int(gid_line.split(":")[1])
+
+ return result, current_gid