diff --git a/aws/lambda/clickhouse-replicator-dynamo/lambda_function.py b/aws/lambda/clickhouse-replicator-dynamo/lambda_function.py index 2c89cbd0b4..d15be2456c 100644 --- a/aws/lambda/clickhouse-replicator-dynamo/lambda_function.py +++ b/aws/lambda/clickhouse-replicator-dynamo/lambda_function.py @@ -27,6 +27,8 @@ "torchci-pull-request-review": "default.pull_request_review", "torchci-pull-request-review-comment": "default.pull_request_review_comment", "torchci-metrics-ci-wait-time": "misc.metrics_ci_wait_time", + "torchci-dynamo-perf-stats": "benchmark.inductor_torch_dynamo_perf_stats", + "torchci-oss-ci-benchmark": "benchmark.oss_ci_benchmark_v2", } diff --git a/aws/lambda/clickhouse-replicator-s3/lambda_function.py b/aws/lambda/clickhouse-replicator-s3/lambda_function.py index 79228274a8..bb7d76172d 100644 --- a/aws/lambda/clickhouse-replicator-s3/lambda_function.py +++ b/aws/lambda/clickhouse-replicator-s3/lambda_function.py @@ -351,10 +351,111 @@ def get_insert_query(compression): get_clickhouse_client().query(get_insert_query("gzip")) except Exception as e: get_clickhouse_client().query( - f"insert into errors.gen_errors ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')" + f"insert into errors.gen_errors VALUES ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')" ) +def general_adapter(table, bucket, key, schema, compression, format) -> None: + url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}" + + def get_insert_query(compression): + return f""" + insert into {table} + select *, ('{bucket}', '{key}') as _meta + from s3('{url}', '{format}', '{schema}', '{compression}', + extra_credentials( + role_arn = 'arn:aws:iam::308535385114:role/clickhouse_role' + ) + ) + """ + + try: + get_clickhouse_client().query(get_insert_query(compression)) + except Exception as e: + get_clickhouse_client().query( + f"insert into errors.gen_errors values ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')" + ) + + +def external_aggregated_test_metrics_adapter(table, bucket, key) -> None: + schema = """ + `avg_duration_in_second` Int64, + `avg_skipped` Int64, + `avg_tests` Int64, + `base_name` String, + `date` DateTime64(3), + `job_name` String, + `max_errors` Int64, + `max_failures` Int64, + `occurences` Int64, + `oncalls` Array(String), + `sum_duration_in_second` Int64, + `sum_skipped` Int64, + `sum_tests` Int64, + `test_class` String, + `test_config` String, + `test_file` String, + `workflow_id` Int64, + `workflow_name` String, + `workflow_run_attempt` Int64 + """ + general_adapter(table, bucket, key, schema, "gzip", "JSONEachRow") + + +def torchao_perf_stats_adapter(table, bucket, key) -> None: + schema = """ + `CachingAutotuner.benchmark_all_configs` String, + `GraphLowering.compile_to_module` String, + `GraphLowering.run` String, + `OutputGraph.call_user_compiler` String, + `Scheduler.__init__` String, + `Scheduler.codegen` String, + `WrapperCodeGen.generate` String, + `_compile..compile_inner` String, + `_compile.compile_inner` String, + `abs_latency` String, + `accuracy` String, + `autograd_captures` String, + `autograd_compiles` String, + `batch_size` String, + `calls_captured` String, + `compilation_latency` String, + `compile_fx..bw_compiler` String, + `compile_fx..fw_compiler_base` String, + `compile_fx_inner` String, + `compression_ratio` String, + `create_aot_dispatcher_function` String, + `cudagraph_skips` String, + `dev` String, + `dynamo_peak_mem` String, + `eager_peak_mem` String, + `filename` String, + `graph_breaks` String, + `head_branch` String, + `head_repo` String, + `head_sha` String, + `job_id` String, + `name` String, + `run_attempt` String, + `runner` String, + `speedup` String, + `test_name` String, + `unique_graph_breaks` String, + `unique_graphs` String, + `workflow_id` String + """ + general_adapter(table, bucket, key, schema, "none", "CSV") + + +def torchbench_userbenchmark_adapter(table, bucket, key): + schema = """ + `environ` String, + `metrics` String, + `name` String + """ + general_adapter(table, bucket, key, schema, "none", "JSONEachRow") + + SUPPORTED_PATHS = { "merges": "default.merges", "queue_times_historical": "default.queue_times_historical", @@ -364,6 +465,9 @@ def get_insert_query(compression): "failed_test_runs": "default.failed_test_runs", "rerun_disabled_tests": "default.rerun_disabled_tests", "external_contribution_counts": "misc.external_contribution_stats", + "test_data_aggregates": "misc.aggregated_test_metrics", + "torchbench-csv/torchao": "benchmark.inductor_torchao_perf_stats", + "torchbench-userbenchmark": "benchmark.torchbench_userbenchmark", } OBJECT_CONVERTER = { @@ -375,6 +479,9 @@ def get_insert_query(compression): "default.rerun_disabled_tests": rerun_disabled_tests_adapter, "default.queue_times_historical": queue_times_historical_adapter, "misc.external_contribution_stats": external_contribution_stats_adapter, + "misc.aggregated_test_metrics": external_aggregated_test_metrics_adapter, + "benchmark.inductor_torchao_perf_stats": torchao_perf_stats_adapter, + "benchmark.torchbench_userbenchmark": torchbench_userbenchmark_adapter, }