"""Memory watchdog: periodically read the memory usage of the main test process
and print it out, until terminated."""
-# stdin should refer to the process' /proc/<PID>/statm: we don't pass the
-# process' PID to avoid a race condition in case of - unlikely - PID recycling.
-# If the process crashes, reading from the /proc entry will fail with ESRCH.
import sys
import time
-from test.support import get_pagesize
-
-
-while True:
- page_size = get_pagesize()
- sys.stdin.seek(0)
- statm = sys.stdin.read()
- data = int(statm.split()[5])
- sys.stdout.write(" ... process data size: {data:.1f}G\n"
- .format(data=data * page_size / (1024 ** 3)))
- sys.stdout.flush()
- time.sleep(1)
+from test.libregrtest.utils import get_process_memory_usage
+
+
+ONE_GIB = (1024 ** 3)
+
+
+def watchdog(pid):
+ while True:
+ mem = get_process_memory_usage(pid)
+ if mem is None:
+ # get_process_memory_usage() is not supported on the platform,
+ # or something went wrong. Exit since the next call is likely to
+ # fail the same way.
+ return
+
+ # Prefer sys.stdout.write() to print() to use a single write() syscall.
+ # print(msg) calls write(msg.encode()) and then write(b"\n").
+ sys.stdout.write(f" ... process data size: {mem / ONE_GIB:.1f} GiB\n")
+ sys.stdout.flush()
+ time.sleep(1)
+
+def main():
+ if len(sys.argv) != 2:
+ print(f"usage: python {sys.argv[0]} pid")
+ sys.exit(1)
+ pid = int(sys.argv[1])
+
+ try:
+ watchdog(pid)
+ except KeyboardInterrupt:
+ pass
+
+if __name__ == "__main__":
+ main()
"""
def __init__(self):
- self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
self.started = False
def start(self):
- try:
- f = open(self.procfile, 'r')
- except OSError as e:
- logging.getLogger(__name__).warning('/proc not available for stats: %s', e, exc_info=e)
- sys.stderr.flush()
- return
-
import subprocess
- with f:
- watchdog_script = findfile("memory_watchdog.py")
- self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
- stdin=f,
- stderr=subprocess.DEVNULL)
+ watchdog_script = findfile("memory_watchdog.py")
+ cmd = [sys.executable, watchdog_script, str(os.getpid())]
+ self.mem_watchdog = subprocess.Popen(cmd)
self.started = True
def stop(self):
- if self.started:
- self.mem_watchdog.terminate()
- self.mem_watchdog.wait()
+ if not self.started:
+ return
+ self.mem_watchdog.terminate()
+ self.mem_watchdog.wait()
def bigmemtest(size, memuse, dry_run=True):
if real_max_memuse and verbose:
print()
- print(" ... expected peak memory use: {peak:.1f}G"
- .format(peak=size * memuse / (1024 ** 3)))
+ peak = (size * memuse) / (1024 ** 3)
+ print(f" ... expected peak memory use: {peak:.1f} GiB")
watchdog = _MemoryWatchdog()
watchdog.start()
else: