self._conf_file = os.path.join(self._caddy_dir, 'Caddyfile')
self._error_log = os.path.join(self._caddy_dir, 'caddy.log')
self._tmp_dir = os.path.join(self._caddy_dir, 'tmp')
+ self._error_fd = None
self._process = None
self._http_port = 0
self._https_port = 0
def port(self) -> int:
return self._https_port
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._error_log)
args = [
self._caddy, 'run'
]
- caddyerr = open(self._error_log, 'a')
- self._process = subprocess.Popen(args=args, cwd=self._caddy_dir, stderr=caddyerr)
+ self._error_fd = open(self._error_log, 'a')
+ self._process = subprocess.Popen(args=args, cwd=self._caddy_dir, stderr=self._error_fd)
if self._process.returncode is not None:
return False
return not wait_live or self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
except Exception:
self._process.kill()
self._process = None
+ self.close_log()
return not wait_dead or self.wait_dead(timeout=timedelta(seconds=5))
+ self.close_log()
return True
def restart(self):
def dump_logs(self):
lines = []
lines.append('>>--stdout ----------------------------------------------\n')
- with open(self._stdoutfile) as cstdout:
- lines.extend(cstdout.readlines())
+ with open(self._stdoutfile) as fd:
+ lines.extend(fd.readlines())
lines.append('>>--stderr ----------------------------------------------\n')
- with open(self._stderrfile) as cstderr:
- lines.extend(cstderr.readlines())
+ with open(self._stderrfile) as fd:
+ lines.extend(fd.readlines())
lines.append('<<-------------------------------------------------------\n')
return ''.join(lines)
if self._proc:
raise Exception('tcpdump still running')
lines = []
- for line in open(self._stdoutfile):
- m = re.match(r'.* IP 127\.0\.0\.1\.(\d+) [<>] 127\.0\.0\.1\.(\d+):.*', line)
- if m:
- sport = int(m.group(1))
- dport = int(m.group(2))
- if ports is None or sport in ports or dport in ports:
- lines.append(line)
+ with open(self._stdoutfile) as fd:
+ for line in fd:
+ m = re.match(r'.* IP 127\.0\.0\.1\.(\d+) [<>] 127\.0\.0\.1\.(\d+):.*', line)
+ if m:
+ sport = int(m.group(1))
+ dport = int(m.group(2))
+ if ports is None or sport in ports or dport in ports:
+ lines.append(line)
return lines
@property
def stderr(self) -> List[str]:
if self._proc:
raise Exception('tcpdump still running')
- return open(self._stderrfile).readlines()
+ with open(self._stderrfile) as fd:
+ return fd.readlines()
def sample(self):
# not sure how to make that detection reliable for all platforms
cwd=self._run_dir, shell=False,
env=self._run_env)
profile = RunProfile(p.pid, started_at, self._run_dir)
- if intext is not None and False:
- p.communicate(input=intext.encode(), timeout=1)
+ #if intext is not None and False:
+ # p.communicate(input=intext.encode(), timeout=1)
if self._with_perf:
perf = PerfProfile(p.pid, self._run_dir)
perf.start()
self._dante_log = os.path.join(self._dante_dir, 'dante.log')
self._error_log = os.path.join(self._dante_dir, 'error.log')
self._pid_file = os.path.join(self._dante_dir, 'dante.pid')
+ self._error_fd = None
self._process = None
self.clear_logs()
def port(self) -> int:
return self._port
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._error_log)
self._rmf(self._dante_log)
self._process.terminate()
self._process.wait(timeout=2)
self._process = None
- return not wait_dead or True
+ self.close_log()
return True
def restart(self):
'-p', f'{self._pid_file}',
'-d', '0',
]
- procerr = open(self._error_log, 'a')
- self._process = subprocess.Popen(args=args, stderr=procerr)
+ self._error_fd = open(self._error_log, 'a')
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd)
if self._process.returncode is not None:
return False
return self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
self._conf_file = os.path.join(self._log_dir, 'dnsd.cmd')
self._pid_file = os.path.join(self._log_dir, 'dnsd.pid')
self._error_log = os.path.join(self._log_dir, 'dnsd.err.log')
+ self._error_fd = None
self._process = None
self.clear_logs()
def port(self) -> int:
return self._port
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._log_file)
self._rmf(self._error_log)
self._process.terminate()
self._process.wait(timeout=2)
self._process = None
- return not wait_dead or True
+ self.close_log()
return True
def restart(self):
'--logfile', f'{self._log_file}',
'--pidfile', f'{self._pid_file}',
]
- procerr = open(self._error_log, 'a')
- self._process = subprocess.Popen(args=args, stderr=procerr)
+ self._error_fd = open(self._error_log, 'a')
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd)
if self._process.returncode is not None:
return False
return self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
self._port = 0 # defaults to h3_port
self._cred_name = cred_name
self._loaded_cred_name = None
+ self._error_fd = None
self._process = None
self._tmp_dir = os.path.join(self.env.gen_dir, self._name)
self._run_dir = os.path.join(self._tmp_dir, "run")
def h2_port(self) -> Optional[int]:
return getattr(self, "_h2_port", None)
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._error_log)
self._rmf(self._stderr)
self._loaded_cred_name = self._cred_name
self.write_config()
args = [self._cmd, "-c", self._conf_file]
- ngerr = open(self._stderr, "a")
- self._process = subprocess.Popen(args=args, stderr=ngerr)
+ self._error_fd = open(self._stderr, "a")
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd)
if self._process.returncode is not None:
return False
if wait_live:
self._process.kill()
self._process.wait(timeout=2)
self._process = None
+ self.close_log()
return not wait_dead or self.wait_for_state(
live=False, timeout=timedelta(seconds=5)
)
+ self.close_log()
return True
def kill(self, wait_dead=True):
if self._process:
self._process.kill()
+ self.close_log()
return True
+ self.close_log()
return False
def restart(self):
self._error_log = os.path.join(self._run_dir, 'nghttpx.log')
self._stderr = os.path.join(self._run_dir, 'nghttpx.stderr')
self._tmp_dir = os.path.join(self._run_dir, 'tmp')
+ self._error_fd = None
self._process: Optional[subprocess.Popen] = None
self._cred_name = self._def_cred_name = cred_name
self._loaded_cred_name = ''
def exists(self):
return self._cmd and os.path.exists(self._cmd)
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._error_log)
self._rmf(self._stderr)
self._process.terminate()
self._process.wait(timeout=2)
self._process = None
+ self.close_log()
return not wait_dead or self.wait_dead(timeout=timedelta(seconds=5))
+ self.close_log()
return True
def restart(self):
'--frontend-http3-max-connection-window-size=100M',
# f'--frontend-quic-debug-log',
])
- ngerr = open(self._stderr, 'a')
- self._process = subprocess.Popen(args=args, stderr=ngerr)
+ self._error_fd = open(self._stderr, 'a')
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd)
if self._process.returncode is not None:
return False
return not wait_live or self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
creds.pkey_file,
creds.cert_file,
]
- ngerr = open(self._stderr, 'a')
- self._process = subprocess.Popen(args=args, stderr=ngerr)
+ self._error_fd = open(self._stderr, 'a')
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd)
if self._process.returncode is not None:
return False
return not wait_live or self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
]
self._user_key_files = []
self._user_pub_files = []
+ self._error_fd = None
self._process = None
self.clear_logs()
pubkey = fp.read()
fd.write(pubkey)
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._sshd_log)
def dump_log(self):
lines = ['>>--sshd log ----------------------------------------------\n']
- lines.extend(open(self._sshd_log))
+ with open(self._sshd_log) as fd:
+ lines.extend(fd.readlines())
lines.extend(['>>--curl log ----------------------------------------------\n'])
- lines.extend(open(os.path.join(self._tmp_dir, 'curl.stderr')))
+ with open(os.path.join(self._tmp_dir, 'curl.stderr')) as fd:
+ lines.extend(fd.readlines())
lines.append('<<-------------------------------------------------------\n')
return ''.join(lines)
self._process.terminate()
self._process.wait(timeout=2)
self._process = None
- return not wait_dead or True
+ self.close_log()
return True
def restart(self):
run_env = os.environ.copy()
# does not have any effect, sadly
# run_env['HOME'] = f'{self._home_dir}'
- procerr = open(self._sshd_log, 'a')
- self._process = subprocess.Popen(args=args, stderr=procerr, env=run_env)
+ self._error_fd = open(self._sshd_log, 'a')
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd, env=run_env)
if self._process.returncode is not None:
return False
return self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
self._conf_file = os.path.join(self._vsftpd_dir, 'test.conf')
self._pid_file = os.path.join(self._vsftpd_dir, 'vsftpd.pid')
self._error_log = os.path.join(self._vsftpd_dir, 'vsftpd.log')
+ self._error_fd = None
self._process = None
self.clear_logs()
def port(self) -> int:
return self._port
+ def close_log(self):
+ if self._error_fd:
+ self._error_fd.close()
+ self._error_fd = None
+
def clear_logs(self):
self._rmf(self._error_log)
self._process.terminate()
self._process.wait(timeout=2)
self._process = None
+ self.close_log()
return not wait_dead or self.wait_dead(timeout=timedelta(seconds=5))
+ self.close_log()
return True
def restart(self):
self._cmd,
f'{self._conf_file}',
]
- procerr = open(self._error_log, 'a')
- self._process = subprocess.Popen(args=args, stderr=procerr)
+ self._error_fd = open(self._error_log, 'a')
+ self._process = subprocess.Popen(args=args, stderr=self._error_fd)
if self._process.returncode is not None:
return False
return not wait_live or self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))