import itertools
+import io
import json
import os
import platform
import sys
+import tempfile
import threading
import time
PROCESS_TYPE_MAIN = 0
STACKWALK_DISABLED = 0
+# In-memory buffer before spilling to disk
+DEFAULT_SPILL_BUFFER_BYTES = 128 * 1024
+_JSON_SEPARATORS = (",", ":")
+_JSON_ENCODER = json.JSONEncoder(
+ separators=_JSON_SEPARATORS, allow_nan=False
+)
+
+
+class SpillColumn:
+ def __init__(self, directory, basename, *,
+ buffer_bytes=None):
+ self.path = os.path.join(directory, basename)
+ self.buffer = bytearray()
+ self._buffer_bytes = (
+ DEFAULT_SPILL_BUFFER_BYTES if buffer_bytes is None
+ else buffer_bytes
+ )
+
+ def append(self, value):
+ self.buffer += (_JSON_ENCODER.encode(value) + "\n").encode("utf-8")
+ if len(self.buffer) >= self._buffer_bytes:
+ self.flush()
+
+ def flush(self):
+ with open(self.path, "ab") as file:
+ file.write(self.buffer)
+ self.buffer.clear()
+
+ def iter_tokens(self):
+ with open(self.path, encoding="utf-8") as file:
+ for line in file:
+ yield line.rstrip("\n")
+
+
+class GeckoThreadSpill:
+ _COLUMNS = (
+ ("samples_stack", "samples-stack.json"),
+ ("samples_time", "samples-time.json"),
+ ("markers_name", "markers-name.json"),
+ ("markers_start_time", "markers-start-time.json"),
+ ("markers_end_time", "markers-end-time.json"),
+ ("markers_phase", "markers-phase.json"),
+ ("markers_category", "markers-category.json"),
+ ("markers_data", "markers-data.json"),
+ )
+
+ def __init__(self, directory, tid):
+ prefix = f"thread-{tid}-"
+ for attr, basename in self._COLUMNS:
+ setattr(self, attr, SpillColumn(directory, prefix + basename))
+ self.sample_count = 0
+ self.marker_count = 0
+
+ def append_sample(self, stack_index, time_ms):
+ self.samples_stack.append(stack_index)
+ self.samples_time.append(time_ms)
+ self.sample_count += 1
+
+ def append_marker(self, name_idx, start_time, end_time, phase, category, data):
+ self.markers_name.append(name_idx)
+ self.markers_start_time.append(start_time)
+ self.markers_end_time.append(end_time)
+ self.markers_phase.append(phase)
+ self.markers_category.append(category)
+ self.markers_data.append(data)
+ self.marker_count += 1
+
+ def prepare_read(self):
+ for attr, _basename in self._COLUMNS:
+ getattr(self, attr).flush()
+
class GeckoCollector(Collector):
aggregating = True
# Per-thread data structures
self.threads = {} # tid -> thread data
+ self.spill_dir = None
+ self.exported = False
# Global tables
self.libs = []
stack_frames: List of interpreter/thread frame info
timestamps_us: List of timestamps in microseconds (None for live sampling)
"""
+ if self.exported:
+ raise RuntimeError("cannot append to GeckoCollector after export")
+
# Handle live sampling (no timestamps provided)
if timestamps_us is None:
current_time = (time.monotonic() * 1000) - self.start_time
stack_index = self._process_stack(thread_data, frames)
# Add samples with timestamps
- samples = thread_data["samples"]
- samples_stack = samples["stack"]
- samples_time = samples["time"]
- samples_delay = samples["eventDelay"]
-
+ thread_spill = thread_data["_spill"]
for t in times:
- samples_stack.append(stack_index)
- samples_time.append(t)
- samples_delay.append(None)
+ thread_spill.append_sample(stack_index, t)
# Handle opcodes
if self.opcodes_enabled and frames:
def _create_thread(self, tid, is_main_thread):
"""Create a new thread structure with processed profile format."""
+ if self.spill_dir is None:
+ self.spill_dir = tempfile.TemporaryDirectory()
thread = {
"name": f"Thread-{tid}",
"tid": tid,
"processType": "default",
"processName": "Python Process",
- # Sample data - processed format with direct arrays
- "samples": {
- "stack": [],
- "time": [],
- "eventDelay": [],
- "weight": None,
- "weightType": "samples",
- "length": 0, # Will be updated on export
- },
# Stack table - processed format
"stackTable": {
"frame": [],
"functionSize": [],
"length": 0,
},
- # Markers - processed format (arrays)
- "markers": {
- "data": [],
- "name": [],
- "startTime": [],
- "endTime": [],
- "phase": [],
- "category": [],
- "length": 0,
- },
# Caches for deduplication
"_stackCache": {},
"_frameCache": {},
"_funcCache": {},
"_resourceCache": {},
+ "_spill": GeckoThreadSpill(self.spill_dir.name, tid),
}
return thread
if tid not in self.threads:
return
- thread_data = self.threads[tid]
duration = end_time - start_time
name_idx = self._intern_string(name)
- markers = thread_data["markers"]
- markers["name"].append(name_idx)
- markers["startTime"].append(start_time)
- markers["endTime"].append(end_time)
- markers["phase"].append(1) # 1 = interval marker
- markers["category"].append(category)
- markers["data"].append({
- "type": name.replace(" ", ""),
- "duration": duration,
- "tid": tid
- })
-
- def _add_opcode_interval_marker(self, tid, opcode, lineno, col_offset, funcname, start_time, end_time):
+ self.threads[tid]["_spill"].append_marker(
+ name_idx, start_time, end_time, 1, category, {
+ "type": name.replace(" ", ""),
+ "duration": duration,
+ "tid": tid,
+ }
+ )
+
+ def _add_opcode_interval_marker(self, tid, opcode, lineno, col_offset,
+ funcname, start_time, end_time):
"""Add an interval marker for opcode execution span."""
if tid not in self.threads or opcode is None:
return
- thread_data = self.threads[tid]
opcode_info = get_opcode_info(opcode)
# Use formatted opcode name (with base opcode for specialized ones)
formatted_opname = format_opcode(opcode)
name_idx = self._intern_string(formatted_opname)
- markers = thread_data["markers"]
- markers["name"].append(name_idx)
- markers["startTime"].append(start_time)
- markers["endTime"].append(end_time)
- markers["phase"].append(1) # 1 = interval marker
- markers["category"].append(CATEGORY_OPCODES)
- markers["data"].append({
- "type": "Opcode",
- "opcode": opcode,
- "opname": formatted_opname,
- "base_opname": opcode_info["base_opname"],
- "is_specialized": opcode_info["is_specialized"],
- "line": lineno,
- "column": col_offset if col_offset >= 0 else None,
- "function": funcname,
- "duration": end_time - start_time,
- })
+ self.threads[tid]["_spill"].append_marker(
+ name_idx, start_time, end_time, 1, CATEGORY_OPCODES, {
+ "type": "Opcode",
+ "opcode": opcode,
+ "opname": formatted_opname,
+ "base_opname": opcode_info["base_opname"],
+ "is_specialized": opcode_info["is_specialized"],
+ "line": lineno,
+ "column": col_offset if col_offset >= 0 else None,
+ "function": funcname,
+ "duration": end_time - start_time,
+ }
+ )
def _process_stack(self, thread_data, frames):
"""Process a stack and return the stack index."""
def export(self, filename):
"""Export the profile to a Gecko JSON file."""
-
if self.sample_count > 0 and self.last_sample_time > 0:
self.interval = self.last_sample_time / self.sample_count
spinner_thread = threading.Thread(target=spin, daemon=True)
spinner_thread.start()
+ temp_path = None
+ replaced = False
try:
- # Finalize any open markers before building profile
- self._finalize_markers()
-
- profile = self._build_profile()
-
- with open(filename, "w") as f:
- json.dump(profile, f, separators=(",", ":"))
+ self._prepare_for_serialization()
+ output_dir = os.path.dirname(os.path.abspath(filename)) or "."
+ with tempfile.NamedTemporaryFile(
+ "w", dir=output_dir, delete=False
+ ) as file:
+ temp_path = file.name
+ self._stream_profile(file)
+ os.replace(temp_path, filename)
+ replaced = True
finally:
+ self.exported = True
stop_spinner.set()
spinner_thread.join(timeout=1.0)
# Small delay to ensure the clear happens
time.sleep(0.01)
+ if temp_path is not None and not replaced:
+ try:
+ os.unlink(temp_path)
+ except FileNotFoundError:
+ pass
+ self._cleanup_spills()
print(f"Gecko profile written to {filename}")
print(
def _build_profile(self):
"""Build the complete profile structure in processed format."""
- # Convert thread data to final format
- threads = []
-
- for tid, thread_data in self.threads.items():
- # Update lengths
- samples = thread_data["samples"]
- stack_table = thread_data["stackTable"]
- frame_table = thread_data["frameTable"]
- func_table = thread_data["funcTable"]
- resource_table = thread_data["resourceTable"]
-
- samples["length"] = len(samples["stack"])
- stack_table["length"] = len(stack_table["frame"])
- frame_table["length"] = len(frame_table["func"])
- func_table["length"] = len(func_table["name"])
- resource_table["length"] = len(resource_table["name"])
- thread_data["markers"]["length"] = len(thread_data["markers"]["name"])
-
- # Clean up internal caches
- del thread_data["_stackCache"]
- del thread_data["_frameCache"]
- del thread_data["_funcCache"]
- del thread_data["_resourceCache"]
-
- threads.append(thread_data)
-
- # Main profile structure in processed format
- profile = {
+ try:
+ self._prepare_for_serialization()
+ file = io.StringIO()
+ self._stream_profile(file)
+ return json.loads(file.getvalue())
+ finally:
+ self.exported = True
+ self._cleanup_spills()
+
+ def _profile_head(self):
+ return {
"meta": {
"interval": self.interval,
"startTime": self.start_time,
},
},
"libs": self.libs,
- "threads": threads,
+ }
+
+ def _profile_tail(self):
+ return {
"pages": [],
"shared": {
"stringArray": self.global_strings,
},
}
- return profile
+ def _prepare_for_serialization(self):
+ if self.exported:
+ raise RuntimeError("GeckoCollector has already been exported")
+ self._finalize_markers()
+ for thread_data in self.threads.values():
+ thread_data["_spill"].prepare_read()
+ thread_data["stackTable"]["length"] = len(thread_data["stackTable"]["frame"])
+ thread_data["frameTable"]["length"] = len(thread_data["frameTable"]["func"])
+ thread_data["funcTable"]["length"] = len(thread_data["funcTable"]["name"])
+ thread_data["resourceTable"]["length"] = len(thread_data["resourceTable"]["name"])
+
+ def _cleanup_spills(self):
+ if self.spill_dir is not None:
+ self.spill_dir.cleanup()
+ self.spill_dir = None
+
+ def _stream_profile(self, file):
+ file.write("{")
+ first = True
+ for key, value in self._profile_head().items():
+ first = _write_json_member(file, key, value, first)
+
+ first = _write_member_name(file, "threads", first)
+ file.write("[")
+ for index, (tid, thread_data) in enumerate(self.threads.items()):
+ if index:
+ file.write(",")
+ self._stream_thread(file, tid, thread_data)
+ file.write("]")
+
+ for key, value in self._profile_tail().items():
+ first = _write_json_member(file, key, value, first)
+ file.write("}")
+
+ def _stream_thread(self, file, tid, thread_data):
+ spill = thread_data["_spill"]
+ metadata = {
+ "name": thread_data["name"],
+ "isMainThread": thread_data["isMainThread"],
+ "processStartupTime": thread_data["processStartupTime"],
+ "processShutdownTime": thread_data["processShutdownTime"],
+ "registerTime": thread_data["registerTime"],
+ "unregisterTime": thread_data["unregisterTime"],
+ "pausedRanges": thread_data["pausedRanges"],
+ "pid": thread_data["pid"],
+ "tid": thread_data["tid"],
+ "processType": thread_data["processType"],
+ "processName": thread_data["processName"],
+ }
+ file.write("{")
+ first = True
+ for key, value in metadata.items():
+ first = _write_json_member(file, key, value, first)
+
+ first = _write_member_name(file, "samples", first)
+ self._stream_samples(file, spill)
+ for key in (
+ "stackTable",
+ "frameTable",
+ "funcTable",
+ "resourceTable",
+ "nativeSymbols",
+ ):
+ first = _write_json_member(file, key, thread_data[key], first)
+ first = _write_member_name(file, "markers", first)
+ self._stream_markers(file, spill)
+ file.write("}")
+
+ def _stream_samples(self, file, spill):
+ _stream_column_table(
+ file,
+ (
+ ("stack", spill.samples_stack.iter_tokens()),
+ ("time", spill.samples_time.iter_tokens()),
+ ("eventDelay", ("null" for _ in range(spill.sample_count))),
+ ),
+ spill.sample_count,
+ (
+ ("weight", None),
+ ("weightType", "samples"),
+ ("length", spill.sample_count),
+ ),
+ )
+
+ def _stream_markers(self, file, spill):
+ _stream_column_table(
+ file,
+ (
+ ("data", spill.markers_data.iter_tokens()),
+ ("name", spill.markers_name.iter_tokens()),
+ ("startTime", spill.markers_start_time.iter_tokens()),
+ ("endTime", spill.markers_end_time.iter_tokens()),
+ ("phase", spill.markers_phase.iter_tokens()),
+ ("category", spill.markers_category.iter_tokens()),
+ ),
+ spill.marker_count,
+ (("length", spill.marker_count),),
+ )
+
+
+def _write_json(file, value):
+ for chunk in _JSON_ENCODER.iterencode(value):
+ file.write(chunk)
+
+
+def _write_member_name(file, name, first):
+ if not first:
+ file.write(",")
+ _write_json(file, name)
+ file.write(":")
+ return False
+
+
+def _write_json_member(file, name, value, first):
+ first = _write_member_name(file, name, first)
+ _write_json(file, value)
+ return first
+
+
+def _stream_column_table(file, columns, expected_count, trailing_members=()):
+ file.write("{")
+ first = True
+ for name, token_iter in columns:
+ first = _write_member_name(file, name, first)
+ _stream_array(file, token_iter, expected_count, name)
+ for name, value in trailing_members:
+ first = _write_json_member(file, name, value, first)
+ file.write("}")
+
+
+def _stream_array(file, token_iter, expected_count, label="array"):
+ file.write("[")
+ count = 0
+ for token in token_iter:
+ if count:
+ file.write(",")
+ file.write(token)
+ count += 1
+ if count != expected_count:
+ raise RuntimeError(
+ f"streamed {count} {label} items, expected {expected_count}"
+ )
+ file.write("]")
try:
import _remote_debugging # noqa: F401
+ from profiling.sampling import gecko_collector
from profiling.sampling.pstats_collector import PstatsCollector
from profiling.sampling.stack_collector import (
CollapsedStackCollector,
return None
+def export_gecko_profile(testcase, collector):
+ gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
+ testcase.addCleanup(close_and_unlink, gecko_out)
+ # We cannot overwrite an open file on Windows.
+ gecko_out.close()
+
+ with captured_stdout(), captured_stderr():
+ collector.export(gecko_out.name)
+
+ testcase.assertGreater(os.path.getsize(gecko_out.name), 0)
+ with open(gecko_out.name, encoding="utf-8") as file:
+ return json.load(file)
+
+
+def assert_gecko_column_lengths(testcase, table, columns):
+ expected = table["length"]
+ for column in columns:
+ testcase.assertEqual(
+ len(table[column]), expected,
+ f"{column!r} has wrong length",
+ )
+
+
+def gecko_marker_names(profile, markers):
+ string_array = profile["shared"]["stringArray"]
+ return [string_array[idx] for idx in markers["name"]]
+
+
+def gecko_opcode_marker_data(profile):
+ markers = profile["threads"][0]["markers"]
+ return [
+ data for data in markers["data"]
+ if data.get("type") == "Opcode"
+ ]
+
+
class TestSampleProfilerComponents(unittest.TestCase):
"""Unit tests for individual profiler components."""
# Verify samples
samples = thread_data["samples"]
- self.assertEqual(len(samples["stack"]), 1)
- self.assertEqual(len(samples["time"]), 1)
self.assertEqual(samples["length"], 1)
+ assert_gecko_column_lengths(
+ self, samples, ("stack", "time", "eventDelay")
+ )
# Verify function table structure and content
func_table = thread_data["funcTable"]
@unittest.skipIf(is_emscripten, "threads not available")
def test_gecko_collector_export(self):
"""Test Gecko profile export functionality."""
- gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
- self.addCleanup(close_and_unlink, gecko_out)
-
collector = GeckoCollector(1000)
test_frames1 = [
collector.collect(test_frames2)
collector.collect(test_frames3)
- # Export gecko profile
- with captured_stdout(), captured_stderr():
- collector.export(gecko_out.name)
-
- # Verify file was created and contains valid data
- self.assertTrue(os.path.exists(gecko_out.name))
- self.assertGreater(os.path.getsize(gecko_out.name), 0)
-
- # Check file contains valid JSON
- with open(gecko_out.name, "r") as f:
- profile_data = json.load(f)
+ profile_data = export_gecko_profile(self, collector)
# Should be valid Gecko profile format
self.assertIn("meta", profile_data)
self.assertIn("func2", string_array)
self.assertIn("other_func", string_array)
+ thread_data = profile_data["threads"][0]
+ assert_gecko_column_lengths(
+ self, thread_data["samples"], ("stack", "time", "eventDelay")
+ )
+
+ @unittest.skipIf(is_emscripten, "threads not available")
+ def test_gecko_collector_export_after_spill_flush(self):
+ """Test Gecko profile export after spill buffers flush to disk."""
+ old_buffer_bytes = gecko_collector.DEFAULT_SPILL_BUFFER_BYTES
+ gecko_collector.DEFAULT_SPILL_BUFFER_BYTES = 1
+ self.addCleanup(
+ setattr, gecko_collector, "DEFAULT_SPILL_BUFFER_BYTES",
+ old_buffer_bytes
+ )
+
+ collector = GeckoCollector(1000)
+ test_frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [MockFrameInfo("file.py", 10, "func")],
+ status=THREAD_STATUS_HAS_GIL,
+ )
+ ],
+ )
+ ]
+ collector.collect(test_frames, timestamps_us=[1000, 2000, 3000])
+
+ profile_data = export_gecko_profile(self, collector)
+ samples = profile_data["threads"][0]["samples"]
+ self.assertEqual(samples["length"], 3)
+ assert_gecko_column_lengths(
+ self, samples, ("stack", "time", "eventDelay")
+ )
+
+ @unittest.skipIf(is_emscripten, "threads not available")
+ def test_gecko_collector_rejects_collect_after_export(self):
+ collector = GeckoCollector(1000)
+ test_frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [MockFrameInfo("file.py", 10, "func")],
+ status=THREAD_STATUS_HAS_GIL,
+ )
+ ],
+ )
+ ]
+ collector.collect(test_frames)
+ export_gecko_profile(self, collector)
+
+ with self.assertRaisesRegex(RuntimeError, "after export"):
+ collector.collect(test_frames)
+
+ @unittest.skipIf(is_emscripten, "threads not available")
+ def test_gecko_collector_export_failure_keeps_existing_file(self):
+ collector = GeckoCollector(1000)
+ test_frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [MockFrameInfo("file.py", 10, "func")],
+ status=THREAD_STATUS_HAS_GIL,
+ )
+ ],
+ )
+ ]
+ collector.collect(test_frames)
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ filename = os.path.join(temp_dir, "profile.json")
+ with open(filename, "w", encoding="utf-8") as file:
+ file.write("existing")
+
+ before = set(os.listdir(temp_dir))
+
+ def fail(file):
+ raise OSError("boom")
+
+ collector._stream_profile = fail
+ with captured_stdout(), captured_stderr():
+ with self.assertRaisesRegex(OSError, "boom"):
+ collector.export(filename)
+
+ with open(filename, encoding="utf-8") as file:
+ self.assertEqual(file.read(), "existing")
+ self.assertEqual(set(os.listdir(temp_dir)), before)
+
def test_gecko_collector_markers(self):
"""Test Gecko profile markers for GIL and CPU state tracking."""
collector = GeckoCollector(1000)
self.assertIn("markers", thread_data)
markers = thread_data["markers"]
- # Should have marker arrays
- self.assertIn("name", markers)
- self.assertIn("startTime", markers)
- self.assertIn("endTime", markers)
- self.assertIn("category", markers)
self.assertGreater(
markers["length"], 0, "Should have generated markers"
)
-
- # Get marker names from string table
- string_array = profile_data["shared"]["stringArray"]
- marker_names = [string_array[idx] for idx in markers["name"]]
+ assert_gecko_column_lengths(
+ self, markers,
+ ("data", "name", "startTime", "endTime", "phase", "category"),
+ )
# Verify we have different marker types
- marker_name_set = set(marker_names)
+ marker_name_set = set(gecko_marker_names(profile_data, markers))
# Should have "Has GIL" markers (when thread had GIL)
self.assertIn(
def test_gecko_opcode_state_tracking(self):
"""Test that GeckoCollector tracks opcode state changes."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=True)
+ self.addCleanup(collector._cleanup_spills)
# First sample with opcode 90 (RAISE_VARARGS)
frame1 = MockFrameInfo("test.py", 10, "func", opcode=90)
collector.collect(frames2)
# Should have emitted a marker for the first opcode
- thread_data = collector.threads[1]
- markers = thread_data["markers"]
- # At least one marker should have been added
- self.assertGreater(len(markers["name"]), 0)
+ profile = collector._build_profile()
+ markers = profile["threads"][0]["markers"]
+ assert_gecko_column_lengths(
+ self, markers,
+ ("data", "name", "startTime", "endTime", "phase", "category"),
+ )
+ opcode_markers = gecko_opcode_marker_data(profile)
+ self.assertIn(
+ {
+ "opcode": 90,
+ "line": 10,
+ "function": "func",
+ },
+ [
+ {
+ "opcode": marker["opcode"],
+ "line": marker["line"],
+ "function": marker["function"],
+ }
+ for marker in opcode_markers
+ ],
+ )
def test_gecko_opcode_markers_not_emitted_when_disabled(self):
"""Test that no opcode markers when opcodes=False."""
]
collector.collect(frames2)
- # opcode_state should not be tracked
- self.assertEqual(len(collector.opcode_state), 0)
+ profile = collector._build_profile()
+ self.assertEqual(gecko_opcode_marker_data(profile), [])
+ self.assertEqual(profile["meta"]["markerSchema"], [])
def test_gecko_opcode_with_none_opcode(self):
"""Test that None opcode doesn't cause issues."""
]
collector.collect(frames)
- # Should track the state but opcode is None
- self.assertIn(1, collector.opcode_state)
- self.assertIsNone(collector.opcode_state[1][0])
+ profile = collector._build_profile()
+ self.assertEqual(gecko_opcode_marker_data(profile), [])
class TestCollectorFrameFormat(unittest.TestCase):