diff --git a/paasta_tools/monitoring_tools.py b/paasta_tools/monitoring_tools.py index 0e3d447032..24e2a51d93 100644 --- a/paasta_tools/monitoring_tools.py +++ b/paasta_tools/monitoring_tools.py @@ -366,6 +366,29 @@ def send_replication_event( ) +def emit_spark_tron_monitoring_metrics( + job_name: str, + action_name: str, + paasta_cluster: str, + paasta_service: str, + paasta_instance: str, + team: str, + page: bool, +) -> None: + meteorite_dims = { + "job_name": job_name, + "action_name": action_name, + "paasta_service": paasta_service, + "paasta_cluster": paasta_cluster, + "paasta_instance": paasta_instance, + "team": team, + "page": page, + } + yelp_meteorite.create_counter( + "spark.tron.monitoring_config", meteorite_dims + ).count() + + def emit_replication_metrics( replication_infos: Mapping[str, Mapping[str, Mapping[str, int]]], instance_config: LongRunningServiceConfig, diff --git a/paasta_tools/tron_tools.py b/paasta_tools/tron_tools.py index 13163a345a..3b1986b786 100644 --- a/paasta_tools/tron_tools.py +++ b/paasta_tools/tron_tools.py @@ -1258,9 +1258,33 @@ def create_complete_config( ) for job_config in job_configs } + if not dry_run: + emit_spark_monitoring_metrics(preproccessed_config) return yaml.dump(preproccessed_config, Dumper=Dumper, default_flow_style=False) +def emit_spark_monitoring_metrics(preproccessed_config: Dict[str, Any]) -> None: + """Emit the monitoring configs for Spark actions in the preprocessed config.""" + for job_name, job_config in preproccessed_config["jobs"].items(): + for action_name, action_config in job_config["actions"].items(): + if _is_spark_action(action_config): + monitoring_tools.emit_spark_tron_monitoring_metrics( + job_name=job_name, + action_name=action_name, + paasta_cluster=action_config.get("env", {}).get("PAASTA_CLUSTER"), + paasta_service=action_config.get("env", {}).get("PAASTA_SERVICE"), + paasta_instance=action_config.get("env", {}).get("PAASTA_INSTANCE"), + team=action_config.get("monitoring", {}).get("team"), + page=action_config.get("monitoring", {}).get("page"), + ) + + +def _is_spark_action(action_config: Dict[str, Any]) -> bool: + return action_config.get( + "executor" + ) == "spark" or "paasta spark-run" in action_config.get("command", "") + + def validate_complete_config( service: str, cluster: str, soa_dir: str = DEFAULT_SOA_DIR ) -> List[str]: