Skip to content

Commit

Permalink
[ch] Set up ingestion for benchmarks tables and aggregated_test_metri…
Browse files Browse the repository at this point in the history
…cs (#5720)

s3 tested by running locally and seeing if it got inserted
dynamo only checked `python
aws/lambda/clickhouse-replicator-dynamo/test_lambda_function.py` to make
sure other ingestions didn't break

Also small fix for external_contribution_stats error handler
  • Loading branch information
clee2000 authored Sep 27, 2024
1 parent 84f2e16 commit 2f82232
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 1 deletion.
2 changes: 2 additions & 0 deletions aws/lambda/clickhouse-replicator-dynamo/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"torchci-pull-request-review": "default.pull_request_review",
"torchci-pull-request-review-comment": "default.pull_request_review_comment",
"torchci-metrics-ci-wait-time": "misc.metrics_ci_wait_time",
"torchci-dynamo-perf-stats": "benchmark.inductor_torch_dynamo_perf_stats",
"torchci-oss-ci-benchmark": "benchmark.oss_ci_benchmark_v2",
}


Expand Down
109 changes: 108 additions & 1 deletion aws/lambda/clickhouse-replicator-s3/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,111 @@ def get_insert_query(compression):
get_clickhouse_client().query(get_insert_query("gzip"))
except Exception as e:
get_clickhouse_client().query(
f"insert into errors.gen_errors ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
f"insert into errors.gen_errors VALUES ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
)


def general_adapter(table, bucket, key, schema, compression, format) -> None:
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"

def get_insert_query(compression):
return f"""
insert into {table}
select *, ('{bucket}', '{key}') as _meta
from s3('{url}', '{format}', '{schema}', '{compression}',
extra_credentials(
role_arn = 'arn:aws:iam::308535385114:role/clickhouse_role'
)
)
"""

try:
get_clickhouse_client().query(get_insert_query(compression))
except Exception as e:
get_clickhouse_client().query(
f"insert into errors.gen_errors values ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
)


def external_aggregated_test_metrics_adapter(table, bucket, key) -> None:
schema = """
`avg_duration_in_second` Int64,
`avg_skipped` Int64,
`avg_tests` Int64,
`base_name` String,
`date` DateTime64(3),
`job_name` String,
`max_errors` Int64,
`max_failures` Int64,
`occurences` Int64,
`oncalls` Array(String),
`sum_duration_in_second` Int64,
`sum_skipped` Int64,
`sum_tests` Int64,
`test_class` String,
`test_config` String,
`test_file` String,
`workflow_id` Int64,
`workflow_name` String,
`workflow_run_attempt` Int64
"""
general_adapter(table, bucket, key, schema, "gzip", "JSONEachRow")


def torchao_perf_stats_adapter(table, bucket, key) -> None:
schema = """
`CachingAutotuner.benchmark_all_configs` String,
`GraphLowering.compile_to_module` String,
`GraphLowering.run` String,
`OutputGraph.call_user_compiler` String,
`Scheduler.__init__` String,
`Scheduler.codegen` String,
`WrapperCodeGen.generate` String,
`_compile.<locals>.compile_inner` String,
`_compile.compile_inner` String,
`abs_latency` String,
`accuracy` String,
`autograd_captures` String,
`autograd_compiles` String,
`batch_size` String,
`calls_captured` String,
`compilation_latency` String,
`compile_fx.<locals>.bw_compiler` String,
`compile_fx.<locals>.fw_compiler_base` String,
`compile_fx_inner` String,
`compression_ratio` String,
`create_aot_dispatcher_function` String,
`cudagraph_skips` String,
`dev` String,
`dynamo_peak_mem` String,
`eager_peak_mem` String,
`filename` String,
`graph_breaks` String,
`head_branch` String,
`head_repo` String,
`head_sha` String,
`job_id` String,
`name` String,
`run_attempt` String,
`runner` String,
`speedup` String,
`test_name` String,
`unique_graph_breaks` String,
`unique_graphs` String,
`workflow_id` String
"""
general_adapter(table, bucket, key, schema, "none", "CSV")


def torchbench_userbenchmark_adapter(table, bucket, key):
schema = """
`environ` String,
`metrics` String,
`name` String
"""
general_adapter(table, bucket, key, schema, "none", "JSONEachRow")


SUPPORTED_PATHS = {
"merges": "default.merges",
"queue_times_historical": "default.queue_times_historical",
Expand All @@ -364,6 +465,9 @@ def get_insert_query(compression):
"failed_test_runs": "default.failed_test_runs",
"rerun_disabled_tests": "default.rerun_disabled_tests",
"external_contribution_counts": "misc.external_contribution_stats",
"test_data_aggregates": "misc.aggregated_test_metrics",
"torchbench-csv/torchao": "benchmark.inductor_torchao_perf_stats",
"torchbench-userbenchmark": "benchmark.torchbench_userbenchmark",
}

OBJECT_CONVERTER = {
Expand All @@ -375,6 +479,9 @@ def get_insert_query(compression):
"default.rerun_disabled_tests": rerun_disabled_tests_adapter,
"default.queue_times_historical": queue_times_historical_adapter,
"misc.external_contribution_stats": external_contribution_stats_adapter,
"misc.aggregated_test_metrics": external_aggregated_test_metrics_adapter,
"benchmark.inductor_torchao_perf_stats": torchao_perf_stats_adapter,
"benchmark.torchbench_userbenchmark": torchbench_userbenchmark_adapter,
}


Expand Down

0 comments on commit 2f82232

Please sign in to comment.