--- /dev/null
+import json
+import os
+import platform
+import time
+
+from .collector import Collector, THREAD_STATE_RUNNING
+
+
+# Categories matching Firefox Profiler expectations
+GECKO_CATEGORIES = [
+ {"name": "Other", "color": "grey", "subcategories": ["Other"]},
+ {"name": "Python", "color": "yellow", "subcategories": ["Other"]},
+ {"name": "Native", "color": "blue", "subcategories": ["Other"]},
+ {"name": "Idle", "color": "transparent", "subcategories": ["Other"]},
+]
+
+# Category indices
+CATEGORY_OTHER = 0
+CATEGORY_PYTHON = 1
+CATEGORY_NATIVE = 2
+CATEGORY_IDLE = 3
+
+# Subcategory indices
+DEFAULT_SUBCATEGORY = 0
+
+GECKO_FORMAT_VERSION = 32
+GECKO_PREPROCESSED_VERSION = 57
+
+# Resource type constants
+RESOURCE_TYPE_LIBRARY = 1
+
+# Frame constants
+FRAME_ADDRESS_NONE = -1
+FRAME_INLINE_DEPTH_ROOT = 0
+
+# Process constants
+PROCESS_TYPE_MAIN = 0
+STACKWALK_DISABLED = 0
+
+
+class GeckoCollector(Collector):
+ def __init__(self, *, skip_idle=False):
+ self.skip_idle = skip_idle
+ self.start_time = time.time() * 1000 # milliseconds since epoch
+
+ # Global string table (shared across all threads)
+ self.global_strings = ["(root)"] # Start with root
+ self.global_string_map = {"(root)": 0}
+
+ # Per-thread data structures
+ self.threads = {} # tid -> thread data
+
+ # Global tables
+ self.libs = []
+
+ # Sampling interval tracking
+ self.sample_count = 0
+ self.last_sample_time = 0
+ self.interval = 1.0 # Will be calculated from actual sampling
+
+ def collect(self, stack_frames):
+ """Collect a sample from stack frames."""
+ current_time = (time.time() * 1000) - self.start_time
+
+ # Update interval calculation
+ if self.sample_count > 0 and self.last_sample_time > 0:
+ self.interval = (
+ current_time - self.last_sample_time
+ ) / self.sample_count
+ self.last_sample_time = current_time
+
+ for interpreter_info in stack_frames:
+ for thread_info in interpreter_info.threads:
+ if (
+ self.skip_idle
+ and thread_info.status != THREAD_STATE_RUNNING
+ ):
+ continue
+
+ frames = thread_info.frame_info
+ if not frames:
+ continue
+
+ tid = thread_info.thread_id
+
+ # Initialize thread if needed
+ if tid not in self.threads:
+ self.threads[tid] = self._create_thread(tid)
+
+ thread_data = self.threads[tid]
+
+ # Process the stack
+ stack_index = self._process_stack(thread_data, frames)
+
+ # Add sample - cache references to avoid dictionary lookups
+ samples = thread_data["samples"]
+ samples["stack"].append(stack_index)
+ samples["time"].append(current_time)
+ samples["eventDelay"].append(None)
+
+ self.sample_count += 1
+
+ def _create_thread(self, tid):
+ """Create a new thread structure with processed profile format."""
+ import threading
+
+ # Determine if this is the main thread
+ try:
+ is_main = tid == threading.main_thread().ident
+ except (RuntimeError, AttributeError):
+ is_main = False
+
+ thread = {
+ "name": f"Thread-{tid}",
+ "isMainThread": is_main,
+ "processStartupTime": 0,
+ "processShutdownTime": None,
+ "registerTime": 0,
+ "unregisterTime": None,
+ "pausedRanges": [],
+ "pid": str(os.getpid()),
+ "tid": tid,
+ "processType": "default",
+ "processName": "Python Process",
+ # Sample data - processed format with direct arrays
+ "samples": {
+ "stack": [],
+ "time": [],
+ "eventDelay": [],
+ "weight": None,
+ "weightType": "samples",
+ "length": 0, # Will be updated on export
+ },
+ # Stack table - processed format
+ "stackTable": {
+ "frame": [],
+ "category": [],
+ "subcategory": [],
+ "prefix": [],
+ "length": 0, # Will be updated on export
+ },
+ # Frame table - processed format
+ "frameTable": {
+ "address": [],
+ "category": [],
+ "subcategory": [],
+ "func": [],
+ "innerWindowID": [],
+ "implementation": [],
+ "optimizations": [],
+ "line": [],
+ "column": [],
+ "inlineDepth": [],
+ "nativeSymbol": [],
+ "length": 0, # Will be updated on export
+ },
+ # Function table - processed format
+ "funcTable": {
+ "name": [],
+ "isJS": [],
+ "relevantForJS": [],
+ "resource": [],
+ "fileName": [],
+ "lineNumber": [],
+ "columnNumber": [],
+ "length": 0, # Will be updated on export
+ },
+ # Resource table - processed format
+ "resourceTable": {
+ "lib": [],
+ "name": [],
+ "host": [],
+ "type": [],
+ "length": 0, # Will be updated on export
+ },
+ # Native symbols table (empty for Python)
+ "nativeSymbols": {
+ "libIndex": [],
+ "address": [],
+ "name": [],
+ "functionSize": [],
+ "length": 0,
+ },
+ # Markers - processed format
+ "markers": {
+ "data": [],
+ "name": [],
+ "startTime": [],
+ "endTime": [],
+ "phase": [],
+ "category": [],
+ "length": 0,
+ },
+ # Caches for deduplication
+ "_stackCache": {},
+ "_frameCache": {},
+ "_funcCache": {},
+ "_resourceCache": {},
+ }
+
+ return thread
+
+ def _is_python(self, filename: str) -> bool:
+ return not filename.startswith("<") or filename in ["<stdin>", "<string>"]
+
+ def _get_category(self, filename: str) -> int:
+ return CATEGORY_PYTHON if self._is_python(filename) else CATEGORY_NATIVE
+
+ def _intern_string(self, s):
+ """Intern a string in the global string table."""
+ if s in self.global_string_map:
+ return self.global_string_map[s]
+ idx = len(self.global_strings)
+ self.global_strings.append(s)
+ self.global_string_map[s] = idx
+ return idx
+
+ def _process_stack(self, thread_data, frames):
+ """Process a stack and return the stack index."""
+ if not frames:
+ return None
+
+ # Cache references to avoid repeated dictionary lookups
+ stack_cache = thread_data["_stackCache"]
+ stack_table = thread_data["stackTable"]
+ stack_frames = stack_table["frame"]
+ stack_prefix = stack_table["prefix"]
+ stack_category = stack_table["category"]
+ stack_subcategory = stack_table["subcategory"]
+
+ # Build stack bottom-up (from root to leaf)
+ prefix_stack_idx = None
+
+ for frame_tuple in reversed(frames):
+ # frame_tuple is (filename, lineno, funcname)
+ filename, lineno, funcname = frame_tuple
+
+ # Get or create function
+ func_idx = self._get_or_create_func(
+ thread_data, filename, funcname, lineno
+ )
+
+ # Get or create frame
+ frame_idx = self._get_or_create_frame(
+ thread_data, func_idx, lineno
+ )
+
+ # Check stack cache
+ stack_key = (frame_idx, prefix_stack_idx)
+ if stack_key in stack_cache:
+ prefix_stack_idx = stack_cache[stack_key]
+ else:
+ # Create new stack entry
+ stack_idx = len(stack_frames)
+ stack_frames.append(frame_idx)
+ stack_prefix.append(prefix_stack_idx)
+
+ # Determine category
+ category = self._get_category(filename)
+ stack_category.append(category)
+ stack_subcategory.append(DEFAULT_SUBCATEGORY)
+
+ stack_cache[stack_key] = stack_idx
+ prefix_stack_idx = stack_idx
+
+ return prefix_stack_idx
+
+ def _get_or_create_func(self, thread_data, filename, funcname, lineno):
+ """Get or create a function entry."""
+ func_cache = thread_data["_funcCache"]
+ func_key = (filename, funcname)
+
+ if func_key in func_cache:
+ return func_cache[func_key]
+
+ # Cache references for func table
+ func_table = thread_data["funcTable"]
+ func_names = func_table["name"]
+ func_is_js = func_table["isJS"]
+ func_relevant = func_table["relevantForJS"]
+ func_resources = func_table["resource"]
+ func_filenames = func_table["fileName"]
+ func_line_numbers = func_table["lineNumber"]
+ func_column_numbers = func_table["columnNumber"]
+
+ func_idx = len(func_names)
+
+ # Intern strings in global table
+ name_idx = self._intern_string(funcname)
+
+ # Determine if Python
+ is_python = self._is_python(filename)
+
+ # Create resource
+ resource_idx = self._get_or_create_resource(thread_data, filename)
+
+ # Add function
+ func_names.append(name_idx)
+ func_is_js.append(is_python)
+ func_relevant.append(is_python)
+ func_resources.append(resource_idx)
+
+ if is_python:
+ filename_idx = self._intern_string(os.path.basename(filename))
+ func_filenames.append(filename_idx)
+ func_line_numbers.append(lineno)
+ else:
+ func_filenames.append(None)
+ func_line_numbers.append(None)
+ func_column_numbers.append(None)
+
+ func_cache[func_key] = func_idx
+ return func_idx
+
+ def _get_or_create_resource(self, thread_data, filename):
+ """Get or create a resource entry."""
+ resource_cache = thread_data["_resourceCache"]
+
+ if filename in resource_cache:
+ return resource_cache[filename]
+
+ # Cache references for resource table
+ resource_table = thread_data["resourceTable"]
+ resource_libs = resource_table["lib"]
+ resource_names = resource_table["name"]
+ resource_hosts = resource_table["host"]
+ resource_types = resource_table["type"]
+
+ resource_idx = len(resource_names)
+ resource_name = (
+ os.path.basename(filename) if "/" in filename else filename
+ )
+ name_idx = self._intern_string(resource_name)
+
+ resource_libs.append(None)
+ resource_names.append(name_idx)
+ resource_hosts.append(None)
+ resource_types.append(RESOURCE_TYPE_LIBRARY)
+
+ resource_cache[filename] = resource_idx
+ return resource_idx
+
+ def _get_or_create_frame(self, thread_data, func_idx, lineno):
+ """Get or create a frame entry."""
+ frame_cache = thread_data["_frameCache"]
+ frame_key = (func_idx, lineno)
+
+ if frame_key in frame_cache:
+ return frame_cache[frame_key]
+
+ # Cache references for frame table
+ frame_table = thread_data["frameTable"]
+ frame_addresses = frame_table["address"]
+ frame_inline_depths = frame_table["inlineDepth"]
+ frame_categories = frame_table["category"]
+ frame_subcategories = frame_table["subcategory"]
+ frame_funcs = frame_table["func"]
+ frame_native_symbols = frame_table["nativeSymbol"]
+ frame_inner_window_ids = frame_table["innerWindowID"]
+ frame_implementations = frame_table["implementation"]
+ frame_lines = frame_table["line"]
+ frame_columns = frame_table["column"]
+ frame_optimizations = frame_table["optimizations"]
+
+ frame_idx = len(frame_funcs)
+
+ # Determine category based on function - use cached func table reference
+ is_python = thread_data["funcTable"]["isJS"][func_idx]
+ category = CATEGORY_PYTHON if is_python else CATEGORY_NATIVE
+
+ frame_addresses.append(FRAME_ADDRESS_NONE)
+ frame_inline_depths.append(FRAME_INLINE_DEPTH_ROOT)
+ frame_categories.append(category)
+ frame_subcategories.append(DEFAULT_SUBCATEGORY)
+ frame_funcs.append(func_idx)
+ frame_native_symbols.append(None)
+ frame_inner_window_ids.append(None)
+ frame_implementations.append(None)
+ frame_lines.append(lineno if lineno else None)
+ frame_columns.append(None)
+ frame_optimizations.append(None)
+
+ frame_cache[frame_key] = frame_idx
+ return frame_idx
+
+ def export(self, filename):
+ """Export the profile to a Gecko JSON file."""
+ if self.sample_count > 0 and self.last_sample_time > 0:
+ self.interval = self.last_sample_time / self.sample_count
+
+ profile = self._build_profile()
+
+ with open(filename, "w") as f:
+ json.dump(profile, f, separators=(",", ":"))
+
+ print(f"Gecko profile written to {filename}")
+ print(
+ f"Open in Firefox Profiler: https://profiler.firefox.com/"
+ )
+
+ def _build_profile(self):
+ """Build the complete profile structure in processed format."""
+ # Convert thread data to final format
+ threads = []
+
+ for tid, thread_data in self.threads.items():
+ # Update lengths
+ samples = thread_data["samples"]
+ stack_table = thread_data["stackTable"]
+ frame_table = thread_data["frameTable"]
+ func_table = thread_data["funcTable"]
+ resource_table = thread_data["resourceTable"]
+
+ samples["length"] = len(samples["stack"])
+ stack_table["length"] = len(stack_table["frame"])
+ frame_table["length"] = len(frame_table["func"])
+ func_table["length"] = len(func_table["name"])
+ resource_table["length"] = len(resource_table["name"])
+
+ # Clean up internal caches
+ del thread_data["_stackCache"]
+ del thread_data["_frameCache"]
+ del thread_data["_funcCache"]
+ del thread_data["_resourceCache"]
+
+ threads.append(thread_data)
+
+ # Main profile structure in processed format
+ profile = {
+ "meta": {
+ "interval": self.interval,
+ "startTime": self.start_time,
+ "abi": platform.machine(),
+ "misc": "Python profiler",
+ "oscpu": platform.machine(),
+ "platform": platform.system(),
+ "processType": PROCESS_TYPE_MAIN,
+ "categories": GECKO_CATEGORIES,
+ "stackwalk": STACKWALK_DISABLED,
+ "toolkit": "",
+ "version": GECKO_FORMAT_VERSION,
+ "preprocessedProfileVersion": GECKO_PREPROCESSED_VERSION,
+ "appBuildID": "",
+ "physicalCPUs": os.cpu_count() or 0,
+ "logicalCPUs": os.cpu_count() or 0,
+ "CPUName": "",
+ "product": "Python",
+ "symbolicated": True,
+ "markerSchema": [],
+ "importedFrom": "Tachyon Sampling Profiler",
+ "extensions": {
+ "id": [],
+ "name": [],
+ "baseURL": [],
+ "length": 0,
+ },
+ },
+ "libs": self.libs,
+ "threads": threads,
+ "pages": [],
+ "shared": {
+ "stringArray": self.global_strings,
+ "sources": {"length": 0, "uuid": [], "filename": []},
+ },
+ }
+
+ return profile
import contextlib
import io
+import json
import marshal
import os
import shutil
CollapsedStackCollector,
FlamegraphCollector,
)
+from profiling.sampling.gecko_collector import GeckoCollector
from test.support.os_helper import unlink
from test.support import force_not_colorized_test_class, SHORT_TIMEOUT
self.assertIn('"value":', content)
self.assertIn('"children":', content)
+ def test_gecko_collector_basic(self):
+ """Test basic GeckoCollector functionality."""
+ collector = GeckoCollector()
+
+ # Test empty state
+ self.assertEqual(len(collector.threads), 0)
+ self.assertEqual(collector.sample_count, 0)
+ self.assertEqual(len(collector.global_strings), 1) # "(root)"
+
+ # Test collecting sample data
+ test_frames = [
+ MockInterpreterInfo(
+ 0,
+ [MockThreadInfo(
+ 1,
+ [("file.py", 10, "func1"), ("file.py", 20, "func2")],
+ )]
+ )
+ ]
+ collector.collect(test_frames)
+
+ # Should have recorded one thread and one sample
+ self.assertEqual(len(collector.threads), 1)
+ self.assertEqual(collector.sample_count, 1)
+ self.assertIn(1, collector.threads)
+
+ profile_data = collector._build_profile()
+
+ # Verify profile structure
+ self.assertIn("meta", profile_data)
+ self.assertIn("threads", profile_data)
+ self.assertIn("shared", profile_data)
+
+ # Check shared string table
+ shared = profile_data["shared"]
+ self.assertIn("stringArray", shared)
+ string_array = shared["stringArray"]
+ self.assertGreater(len(string_array), 0)
+
+ # Should contain our functions in the string array
+ self.assertIn("func1", string_array)
+ self.assertIn("func2", string_array)
+
+ # Check thread data structure
+ threads = profile_data["threads"]
+ self.assertEqual(len(threads), 1)
+ thread_data = threads[0]
+
+ # Verify thread structure
+ self.assertIn("samples", thread_data)
+ self.assertIn("funcTable", thread_data)
+ self.assertIn("frameTable", thread_data)
+ self.assertIn("stackTable", thread_data)
+
+ # Verify samples
+ samples = thread_data["samples"]
+ self.assertEqual(len(samples["stack"]), 1)
+ self.assertEqual(len(samples["time"]), 1)
+ self.assertEqual(samples["length"], 1)
+
+ # Verify function table structure and content
+ func_table = thread_data["funcTable"]
+ self.assertIn("name", func_table)
+ self.assertIn("fileName", func_table)
+ self.assertIn("lineNumber", func_table)
+ self.assertEqual(func_table["length"], 2) # Should have 2 functions
+
+ # Verify actual function content through string array indices
+ func_names = []
+ for idx in func_table["name"]:
+ func_name = string_array[idx] if isinstance(idx, int) and 0 <= idx < len(string_array) else str(idx)
+ func_names.append(func_name)
+
+ self.assertIn("func1", func_names, f"func1 not found in {func_names}")
+ self.assertIn("func2", func_names, f"func2 not found in {func_names}")
+
+ # Verify frame table
+ frame_table = thread_data["frameTable"]
+ self.assertEqual(frame_table["length"], 2) # Should have frames for both functions
+ self.assertEqual(len(frame_table["func"]), 2)
+
+ # Verify stack structure
+ stack_table = thread_data["stackTable"]
+ self.assertGreater(stack_table["length"], 0)
+ self.assertGreater(len(stack_table["frame"]), 0)
+
+ def test_gecko_collector_export(self):
+ """Test Gecko profile export functionality."""
+ gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
+ self.addCleanup(close_and_unlink, gecko_out)
+
+ collector = GeckoCollector()
+
+ test_frames1 = [
+ MockInterpreterInfo(0, [MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])])
+ ]
+ test_frames2 = [
+ MockInterpreterInfo(0, [MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])])
+ ] # Same stack
+ test_frames3 = [MockInterpreterInfo(0, [MockThreadInfo(1, [("other.py", 5, "other_func")])])]
+
+ collector.collect(test_frames1)
+ collector.collect(test_frames2)
+ collector.collect(test_frames3)
+
+ # Export gecko profile
+ with (captured_stdout(), captured_stderr()):
+ collector.export(gecko_out.name)
+
+ # Verify file was created and contains valid data
+ self.assertTrue(os.path.exists(gecko_out.name))
+ self.assertGreater(os.path.getsize(gecko_out.name), 0)
+
+ # Check file contains valid JSON
+ with open(gecko_out.name, "r") as f:
+ profile_data = json.load(f)
+
+ # Should be valid Gecko profile format
+ self.assertIn("meta", profile_data)
+ self.assertIn("threads", profile_data)
+ self.assertIn("shared", profile_data)
+
+ # Check meta information
+ self.assertIn("categories", profile_data["meta"])
+ self.assertIn("interval", profile_data["meta"])
+
+ # Check shared string table
+ self.assertIn("stringArray", profile_data["shared"])
+ self.assertGreater(len(profile_data["shared"]["stringArray"]), 0)
+
+ # Should contain our functions
+ string_array = profile_data["shared"]["stringArray"]
+ self.assertIn("func1", string_array)
+ self.assertIn("func2", string_array)
+ self.assertIn("other_func", string_array)
+
def test_pstats_collector_export(self):
collector = PstatsCollector(
sample_interval_usec=1000000
def test_valid_output_formats(self):
"""Test that all valid output formats are accepted."""
- valid_formats = ["pstats", "collapsed", "flamegraph"]
+ valid_formats = ["pstats", "collapsed", "flamegraph", "gecko"]
tempdir = tempfile.TemporaryDirectory(delete=False)
self.addCleanup(shutil.rmtree, tempdir.name)