Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/cloudai/_core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ def report_order(k: str) -> int:
"per_test": 0, # first
"status": 2,
"dse": 3,
"tarball": 4, # last
"summary": 4,
"tarball": 5, # last
}.get(k, 1)

return sorted(self.scenario_reports.items(), key=lambda kv: report_order(kv[0]))
Expand Down
3 changes: 2 additions & 1 deletion src/cloudai/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from .configurator.grid_search import GridSearchAgent
from .models.workload import CmdArgs, NsysConfiguration, PredictorConfig, TestDefinition
from .parser import Parser
from .reporter import PerTestReporter, StatusReporter, TarballReporter
from .reporter import PerTestReporter, StatusReporter, SummaryReporter, TarballReporter
from .test_parser import TestParser
from .test_scenario_parser import TestScenarioParser

Expand Down Expand Up @@ -96,6 +96,7 @@
"RewardOverrides",
"Runner",
"StatusReporter",
"SummaryReporter",
"System",
"SystemConfigParsingError",
"TarballReporter",
Expand Down
3 changes: 2 additions & 1 deletion src/cloudai/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def register_all():
)
from cloudai.core import Registry
from cloudai.models.scenario import ReportConfig
from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter
from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, SummaryReporter, TarballReporter

# Import systems
from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesRunner, KubernetesSystem
Expand Down Expand Up @@ -328,6 +328,7 @@ def register_all():
)
Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True))
Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True))
Registry().add_scenario_report("summary", SummaryReporter, ReportConfig(enable=True))
Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True))
Registry().add_scenario_report(
"nixl_bench_summary",
Expand Down
134 changes: 132 additions & 2 deletions src/cloudai/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
# limitations under the License.

import contextlib
import json
import logging
import tarfile
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from typing import Any, Optional

import jinja2
import toml
Expand All @@ -31,7 +32,7 @@
from cloudai.report_generator.util import load_system_metadata
from cloudai.util.lazy_imports import lazy

from .core import CommandGenStrategy, Reporter, TestRun, case_name
from .core import METRIC_ERROR, CommandGenStrategy, Reporter, TestRun, case_name
from .models.scenario import TestRunDetails


Expand Down Expand Up @@ -207,6 +208,135 @@ def report_best_dse_config(self):
toml.dump(trd.test_definition.model_dump(), f)


class SummaryReporter(Reporter):
"""Generate a machine-readable scenario summary for automation."""

SUMMARY_FILE_NAME = "cloudai-summary.json"

def generate(self) -> None:
self.load_test_runs()
report_path = self.results_root / self.SUMMARY_FILE_NAME
with report_path.open("w") as f:
json.dump(self.build_summary(), f, indent=2)
f.write("\n")

logging.info("Generated scenario summary at %s", report_path)

def build_summary(self) -> dict[str, Any]:
test_runs = self._test_runs_summary()
return {
"scenario": self.test_scenario.name,
"status": self._scenario_status(test_runs),
"result_dir": self._relative_path(self.results_root),
"reports": self._scenario_artifacts(),
"test_runs": test_runs,
}

def _scenario_status(self, test_runs: list[dict[str, Any]]) -> str:
if not test_runs:
return "unknown"
if all(tr["status"] == "completed" for tr in test_runs):
return "completed"
return "failed"

def _test_runs_summary(self) -> list[dict[str, Any]]:
loaded_by_name: dict[str, list[TestRun]] = {}
for tr in self.trs:
loaded_by_name.setdefault(tr.name, []).append(tr)

summary: list[dict[str, Any]] = []
for test_run in self.test_scenario.test_runs:
loaded_runs = loaded_by_name.get(test_run.name, [])
if test_run.is_dse_job:
summary.append(self._sweep_test_run_summary(test_run, loaded_runs))
else:
summary.extend(self._test_run_summary(tr) for tr in loaded_runs)

return summary

def _sweep_test_run_summary(self, tr: TestRun, sweeps: list[TestRun]) -> dict[str, Any]:
sweep_summaries = [self._test_run_summary(sweep) for sweep in sweeps]
summary = {
"name": tr.name,
"status": self._scenario_status(sweep_summaries),
"output_path": self._relative_path(self.results_root / tr.name),
"artifacts": self._artifacts_excluding(
self.results_root / tr.name, [sweep.output_path for sweep in sweeps]
),
"metrics": {},
"sweeps": sweep_summaries,
}
return summary

def _test_run_summary(self, tr: TestRun) -> dict[str, Any]:
status = tr.test.was_run_successful(tr)
summary = {
"name": case_name(tr),
"status": "completed" if status.is_successful else "failed",
"output_path": self._relative_path(tr.output_path),
"artifacts": self._artifacts(tr.output_path),
"metrics": self._metrics(tr),
}
if status.error_message:
summary["error_message"] = status.error_message
return summary

def _metrics(self, tr: TestRun) -> dict[str, float]:
metrics = {}
for metric in tr.test.agent_metrics:
value = tr.get_metric_value(self.system, metric)
if value is METRIC_ERROR:
continue
metrics[metric] = float(value)

return metrics

def _scenario_artifacts(self) -> list[dict[str, str]]:
if not self.results_root.is_dir():
return []

return [
self._artifact(path)
for path in sorted(self.results_root.iterdir())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't work gracefully when we run sweeping. in the sweeping case, the folders structure will be

/scenario foler/
               /case 1/
                      /sweep 1/
                              /some-report.html
                      /sweep 2/
                              /some-report.html

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sweeping is actually quite important in our work. I believe this json summary report should bring insights if those were run

if path.is_file() and path.name != self.SUMMARY_FILE_NAME
]

def _artifacts(self, root: Path) -> list[dict[str, str]]:
if not root.is_dir():
return []

return [self._artifact(path) for path in sorted(root.rglob("*")) if path.is_file()]

def _artifacts_excluding(self, root: Path, excluded_roots: list[Path]) -> list[dict[str, str]]:
if not root.is_dir():
return []

return [
self._artifact(path)
for path in sorted(root.rglob("*"))
if path.is_file() and not any(self._is_relative_to(path, excluded_root) for excluded_root in excluded_roots)
]

def _is_relative_to(self, path: Path, root: Path) -> bool:
try:
path.relative_to(root)
except ValueError:
return False
return True

def _artifact(self, path: Path) -> dict[str, str]:
return {
"path": self._relative_path(path),
"format": path.suffix.removeprefix(".") or "unknown",
}

def _relative_path(self, path: Path) -> str:
try:
return str(path.relative_to(self.results_root))
except ValueError:
return str(path)


class TarballReporter(Reporter):
"""Creates tarballs of results for failed test runs."""

Expand Down
5 changes: 4 additions & 1 deletion tests/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


from cloudai.core import Registry
from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter
from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, SummaryReporter, TarballReporter
from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesSystem
from cloudai.systems.lsf import LSFInstaller, LSFSystem
from cloudai.systems.runai import RunAISystem
Expand Down Expand Up @@ -278,6 +278,7 @@ def test_scenario_reports():
"moe_benchmark_throughput",
"status",
"dse",
"summary",
"tarball",
"nixl_bench_summary",
"nixl_ep_comparison",
Expand All @@ -291,6 +292,7 @@ def test_scenario_reports():
MoEBenchmarkThroughputReporter,
StatusReporter,
DSEReporter,
SummaryReporter,
TarballReporter,
NIXLBenchComparisonReport,
NixlEPComparisonReport,
Expand All @@ -308,6 +310,7 @@ def test_report_configs():
"moe_benchmark_throughput",
"status",
"dse",
"summary",
"tarball",
"nixl_bench_summary",
"nixl_ep_comparison",
Expand Down
114 changes: 111 additions & 3 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import copy
import csv
import json
import tarfile
from dataclasses import asdict
from pathlib import Path
Expand All @@ -29,7 +30,7 @@
from cloudai.core import CommandGenStrategy, Registry, Reporter, System
from cloudai.models.scenario import ReportConfig, TestRunDetails
from cloudai.report_generator.dse_report import build_dse_summaries
from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, TarballReporter
from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, SummaryReporter, TarballReporter
from cloudai.systems.slurm.slurm_metadata import (
MetadataCUDA,
MetadataMPI,
Expand Down Expand Up @@ -339,11 +340,118 @@ def test_metadata_for_single_sbatch(self, slurm_system: SlurmSystem, slurm_metad
def test_report_order() -> None:
reports = Registry().ordered_scenario_reports()
assert reports[0][0] == "per_test"
assert reports[-3][0] == "status"
assert reports[-2][0] == "dse"
assert reports[-4][0] == "status"
assert reports[-3][0] == "dse"
assert reports[-2][0] == "summary"
assert reports[-1][0] == "tarball"


def test_summary_reporter_writes_machine_readable_summary(
slurm_system: SlurmSystem,
benchmark_tr: TestRun,
) -> None:
report_path = slurm_system.output_path / "test_scenario.html"
report_path.write_text("<html></html>")

for iteration in range(benchmark_tr.iterations):
output_path = slurm_system.output_path / benchmark_tr.name / str(iteration)
(output_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth")
(slurm_system.output_path / benchmark_tr.name / "0" / "cloudai_nccl_test_csv_report.csv").write_text(
"size,bw\n1,2\n"
)

sweep_tr = TestRun(
name="sweep",
test=NCCLTestDefinition(
name="nccl",
description="NCCL sweep",
test_template_name="NcclTest",
cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"),
extra_env_vars={"VAR1": ["value1", "value2"]},
agent_steps=2,
),
num_nodes=1,
nodes=["node1"],
)
sweep_iteration = slurm_system.output_path / sweep_tr.name / "0"
sweep_iteration.mkdir(parents=True)
(sweep_iteration / "trajectory.csv").write_text("step,action,reward,observation\n")
for step in range(sweep_tr.test.agent_steps):
step_path = sweep_iteration / str(step)
step_path.mkdir()
(step_path / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth")
(step_path / "some-report.html").write_text("<html></html>")

scenario = TestScenario(name="test_scenario", test_runs=[benchmark_tr, sweep_tr])
reporter = SummaryReporter(slurm_system, scenario, slurm_system.output_path, ReportConfig())
reporter.generate()

summary_path = slurm_system.output_path / SummaryReporter.SUMMARY_FILE_NAME
summary = json.loads(summary_path.read_text())

assert summary == {
"scenario": "test_scenario",
"status": "completed",
"result_dir": ".",
"reports": [{"path": "test_scenario.html", "format": "html"}],
"test_runs": [
{
"name": "benchmark",
"status": "completed",
"output_path": "benchmark/0",
"artifacts": [
{"path": "benchmark/0/cloudai_nccl_test_csv_report.csv", "format": "csv"},
{"path": "benchmark/0/stdout.txt", "format": "txt"},
],
"metrics": {},
},
{
"name": "benchmark iter=1",
"status": "completed",
"output_path": "benchmark/1",
"artifacts": [{"path": "benchmark/1/stdout.txt", "format": "txt"}],
"metrics": {},
},
{
"name": "benchmark iter=2",
"status": "completed",
"output_path": "benchmark/2",
"artifacts": [{"path": "benchmark/2/stdout.txt", "format": "txt"}],
"metrics": {},
},
{
"name": "sweep",
"status": "completed",
"output_path": "sweep",
"artifacts": [{"path": "sweep/0/trajectory.csv", "format": "csv"}],
"metrics": {},
"sweeps": [
{
"name": "sweep",
"status": "completed",
"output_path": "sweep/0/0",
"artifacts": [
{"path": "sweep/0/0/some-report.html", "format": "html"},
{"path": "sweep/0/0/stdout.txt", "format": "txt"},
],
"metrics": {},
},
{
"name": "sweep step=1",
"status": "completed",
"output_path": "sweep/0/1",
"artifacts": [
{"path": "sweep/0/1/some-report.html", "format": "html"},
{"path": "sweep/0/1/stdout.txt", "format": "txt"},
],
"metrics": {},
},
],
},
],
}


def _write_slurm_job(step_dir: Path, elapsed_time_sec: int) -> None:
metadata = SlurmJobMetadata(
job_id=12345,
Expand Down
Loading