import argparse
import ast
import asyncio
+import asyncio.tools
import concurrent.futures
import contextvars
import inspect
import threading
import types
import warnings
-from asyncio.tools import (TaskTableOutputFormat,
- display_awaited_by_tasks_table,
- display_awaited_by_tasks_tree)
from _colorize import get_theme
from _pyrepl.console import InteractiveColoredConsole
"ps", help="Display a table of all pending tasks in a process"
)
ps.add_argument("pid", type=int, help="Process ID to inspect")
- formats = [fmt.value for fmt in TaskTableOutputFormat]
- formats_to_show = [fmt for fmt in formats
- if fmt != TaskTableOutputFormat.bsv.value]
- ps.add_argument("--format", choices=formats, default="table",
- metavar=f"{{{','.join(formats_to_show)}}}")
pstree = subparsers.add_parser(
"pstree", help="Display a tree of all pending tasks in a process"
)
args = parser.parse_args()
match args.command:
case "ps":
- display_awaited_by_tasks_table(args.pid, format=args.format)
+ asyncio.tools.display_awaited_by_tasks_table(args.pid)
sys.exit(0)
case "pstree":
- display_awaited_by_tasks_tree(args.pid)
+ asyncio.tools.display_awaited_by_tasks_tree(args.pid)
sys.exit(0)
case None:
pass # continue to the interactive shell
"""Tools to analyze tasks running in asyncio programs."""
-from collections import defaultdict
-import csv
+from collections import defaultdict, namedtuple
from itertools import count
-from enum import Enum, StrEnum, auto
+from enum import Enum
import sys
from _remote_debugging import RemoteUnwinder, FrameInfo
sys.exit(1)
-class TaskTableOutputFormat(StrEnum):
- table = auto()
- csv = auto()
- bsv = auto()
- # 🍌SV is not just a format. It's a lifestyle. A philosophy.
- # https://www.youtube.com/watch?v=RrsVi1P6n0w
-
-
-def display_awaited_by_tasks_table(pid, *, format=TaskTableOutputFormat.table):
+def display_awaited_by_tasks_table(pid: int) -> None:
"""Build and print a table of all pending tasks under `pid`."""
tasks = _get_awaited_by_tasks(pid)
table = build_task_table(tasks)
- format = TaskTableOutputFormat(format)
- if format == TaskTableOutputFormat.table:
- _display_awaited_by_tasks_table(table)
- else:
- _display_awaited_by_tasks_csv(table, format=format)
-
-
-_row_header = ('tid', 'task id', 'task name', 'coroutine stack',
- 'awaiter chain', 'awaiter name', 'awaiter id')
-
-
-def _display_awaited_by_tasks_table(table):
- """Print the table in a simple tabular format."""
- print(_fmt_table_row(*_row_header))
- print('-' * 180)
+ # Print the table in a simple tabular format
+ print(
+ f"{'tid':<10} {'task id':<20} {'task name':<20} {'coroutine stack':<50} {'awaiter chain':<50} {'awaiter name':<15} {'awaiter id':<15}"
+ )
+ print("-" * 180)
for row in table:
- print(_fmt_table_row(*row))
-
-
-def _fmt_table_row(tid, task_id, task_name, coro_stack,
- awaiter_chain, awaiter_name, awaiter_id):
- # Format a single row for the table format
- return (f'{tid:<10} {task_id:<20} {task_name:<20} {coro_stack:<50} '
- f'{awaiter_chain:<50} {awaiter_name:<15} {awaiter_id:<15}')
-
-
-def _display_awaited_by_tasks_csv(table, *, format):
- """Print the table in CSV format"""
- if format == TaskTableOutputFormat.csv:
- delimiter = ','
- elif format == TaskTableOutputFormat.bsv:
- delimiter = '\N{BANANA}'
- else:
- raise ValueError(f"Unknown output format: {format}")
- csv_writer = csv.writer(sys.stdout, delimiter=delimiter)
- csv_writer.writerow(_row_header)
- csv_writer.writerows(table)
+ print(f"{row[0]:<10} {row[1]:<20} {row[2]:<20} {row[3]:<50} {row[4]:<50} {row[5]:<15} {row[6]:<15}")
def display_awaited_by_tasks_tree(pid: int) -> None: