Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 0 additions & 63 deletions pathwaysutils/experimental/profiling.py

This file was deleted.

53 changes: 36 additions & 17 deletions pathwaysutils/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import asyncio
from collections.abc import Mapping
import dataclasses
import datetime
import json
import logging
import os
Expand Down Expand Up @@ -112,8 +113,8 @@ def _is_default_profile_options(
and profiler_options.python_tracer_level
== default_options.python_tracer_level
and profiler_options.duration_ms == default_options.duration_ms
and not getattr(profiler_options, "advanced_configuration", None)
and not getattr(profiler_options, "session_id", None)
and not profiler_options.advanced_configuration
and not profiler_options.session_id
)


Expand All @@ -132,9 +133,9 @@ def _create_profile_request(
return profile_request

advanced_config = None
if getattr(profiler_options, "advanced_configuration", None):
if profiler_options.advanced_configuration:
advanced_config = {}
for k, v in getattr(profiler_options, "advanced_configuration").items():
for k, v in profiler_options.advanced_configuration.items():
# Convert python dict to tensorflow.ProfileOptions.AdvancedConfigValue
# json-compatible dict
if isinstance(v, bool):
Expand Down Expand Up @@ -168,7 +169,7 @@ def _create_profile_request(
if pw_trace_opts:
xprof_options["pwTraceOptions"] = pw_trace_opts

if getattr(profiler_options, "session_id", None):
if profiler_options.session_id:
xprof_options["traceSessionName"] = profiler_options.session_id

profile_request["xprofTraceOptions"] = xprof_options
Expand Down Expand Up @@ -270,26 +271,44 @@ def start_trace(
"features for Pathways on Cloud and may not be fully supported."
)

if jax.version.__version_info__ < (0, 9, 2) and profiler_options is not None:
_logger.warning(
"ProfileOptions are not supported until JAX 0.9.2 and will be omitted. "
"Some options can be specified via command line flags."
)
profiler_options = None
if jax.version.__version_info__ < (0, 9, 2):
if profiler_options is not None:
_logger.warning(
"ProfileOptions are not supported until JAX 0.9.2 and will be omitted. "
"Some options can be specified via command line flags."
)
profiler_options = None
else:
if profiler_options is None:
profiler_options = jax.profiler.ProfileOptions()
if not profiler_options.session_id:
profiler_options.session_id = datetime.datetime.now().strftime(
"%Y_%m_%d_%H_%M_%S"
)

profile_request = _create_profile_request(
log_dir, profiler_options, max_num_hosts=max_num_hosts
log_dir,
profiler_options,
max_num_hosts=max_num_hosts,
)

_logger.debug("Profile request: %s", profile_request)

_start_pathways_trace_from_profile_request(profile_request)

_original_start_trace(
log_dir=log_dir,
create_perfetto_link=create_perfetto_link,
create_perfetto_trace=create_perfetto_trace,
)
if jax.version.__version_info__ >= (0, 9, 2):
_original_start_trace(
log_dir=log_dir,
create_perfetto_link=create_perfetto_link,
create_perfetto_trace=create_perfetto_trace,
profiler_options=profiler_options,
)
else:
_original_start_trace(
log_dir=log_dir,
create_perfetto_link=create_perfetto_link,
create_perfetto_trace=create_perfetto_trace,
)


def stop_trace() -> None:
Expand Down
79 changes: 70 additions & 9 deletions pathwaysutils/test/profiling_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ def setUp(self):
self.mock_original_stop_trace = self.enter_context(
mock.patch.object(profiling, "_original_stop_trace", autospec=True)
)
self.mock_datetime = self.enter_context(
mock.patch.object(profiling.datetime, "datetime", autospec=True)
)
self.mock_datetime.now.return_value.strftime.return_value = (
"2026_06_04_05_29_33"
)


@parameterized.parameters(8000, 1234)
def test_collect_profile_port(self, port):
Expand Down Expand Up @@ -233,15 +240,26 @@ def test_start_trace_success(self):
"profileRequest": {
"traceLocation": "gs://test_bucket/test_dir",
"maxNumHosts": 1,
"xprofTraceOptions": {
"traceDirectory": "gs://test_bucket/test_dir",
"pwTraceOptions": {
"enablePythonTracer": True,
},
"traceSessionName": "2026_06_04_05_29_33",
},
}
})
)
self.mock_plugin_executable_cls.return_value.call.assert_called_once()
self.mock_original_start_trace.assert_called_once_with(
log_dir="gs://test_bucket/test_dir",
create_perfetto_link=False,
create_perfetto_trace=False,
)
self.mock_original_start_trace.assert_called_once()
call_args = self.mock_original_start_trace.call_args[1]
self.assertEqual(call_args["log_dir"], "gs://test_bucket/test_dir")
self.assertFalse(call_args["create_perfetto_link"])
self.assertFalse(call_args["create_perfetto_trace"])
if jax.version.__version_info__ >= (0, 9, 2):
self.assertEqual(
call_args["profiler_options"].session_id, "2026_06_04_05_29_33"
)
self.assertIsNotNone(profiling._profile_state.executable)

def test_start_trace_with_max_num_hosts(self):
Expand All @@ -253,15 +271,58 @@ def test_start_trace_with_max_num_hosts(self):
"profileRequest": {
"traceLocation": "gs://test_bucket/test_dir",
"maxNumHosts": 10,
"xprofTraceOptions": {
"traceDirectory": "gs://test_bucket/test_dir",
"pwTraceOptions": {
"enablePythonTracer": True,
},
"traceSessionName": "2026_06_04_05_29_33",
},
}
})
)
self.mock_plugin_executable_cls.return_value.call.assert_called_once()
self.mock_original_start_trace.assert_called_once_with(
log_dir="gs://test_bucket/test_dir",
create_perfetto_link=False,
create_perfetto_trace=False,
self.mock_original_start_trace.assert_called_once()
call_args = self.mock_original_start_trace.call_args[1]
self.assertEqual(call_args["log_dir"], "gs://test_bucket/test_dir")
self.assertFalse(call_args["create_perfetto_link"])
self.assertFalse(call_args["create_perfetto_trace"])
if jax.version.__version_info__ >= (0, 9, 2):
self.assertEqual(
call_args["profiler_options"].session_id, "2026_06_04_05_29_33"
)

@absltest.skipIf(
jax.version.__version_info__ < (0, 9, 2),
"ProfileOptions requires JAX 0.9.2 or newer",
)
def test_start_trace_with_session_id_in_options(self):
options = jax.profiler.ProfileOptions()
options.session_id = "options_session"
profiling.start_trace("gs://test_bucket/test_dir", profiler_options=options)

self.mock_plugin_executable_cls.assert_called_once_with(
json.dumps({
"profileRequest": {
"traceLocation": "gs://test_bucket/test_dir",
"maxNumHosts": 1,
"xprofTraceOptions": {
"traceDirectory": "gs://test_bucket/test_dir",
"pwTraceOptions": {
"enablePythonTracer": True,
},
"traceSessionName": "options_session",
},
}
})
)
self.assertEqual(options.session_id, "options_session")
self.mock_original_start_trace.assert_called_once()
call_args = self.mock_original_start_trace.call_args[1]
self.assertEqual(call_args["log_dir"], "gs://test_bucket/test_dir")
self.assertFalse(call_args["create_perfetto_link"])
self.assertFalse(call_args["create_perfetto_trace"])
self.assertEqual(call_args["profiler_options"].session_id, "options_session")

def test_start_trace_no_toy_computation_second_time(self):
profiling.start_trace("gs://test_bucket/test_dir")
Expand Down
Loading