Skip to content

Commit

Permalink
Add tron spark monitoring config metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
chi-yelp committed Sep 18, 2024
1 parent 710723c commit f947381
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
23 changes: 23 additions & 0 deletions paasta_tools/monitoring_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,29 @@ def send_replication_event(
)


def emit_spark_tron_monitoring_metrics(
job_name: str,
action_name: str,
paasta_cluster: str,
paasta_service: str,
paasta_instance: str,
team: str,
page: bool,
) -> None:
meteorite_dims = {
"job_name": job_name,
"action_name": action_name,
"paasta_service": paasta_service,
"paasta_cluster": paasta_cluster,
"paasta_instance": paasta_instance,
"team": team,
"page": page,
}
yelp_meteorite.create_counter(
"spark.tron.monitoring_config", meteorite_dims
).count()


def emit_replication_metrics(
replication_infos: Mapping[str, Mapping[str, Mapping[str, int]]],
instance_config: LongRunningServiceConfig,
Expand Down
22 changes: 22 additions & 0 deletions paasta_tools/tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,28 @@ def create_complete_config(
return yaml.dump(preproccessed_config, Dumper=Dumper, default_flow_style=False)


def emit_spark_monitoring_metrics(preproccessed_config: Dict[str, Any]) -> None:
"""Emit the monitoring configs for Spark actions in the preprocessed config."""
for job_name, job_config in preproccessed_config["jobs"].items():
for action_name, action_config in job_config["actions"].items():
if _is_spark_action(action_config):
monitoring_tools.emit_spark_tron_monitoring_metrics(
job_name=job_name,
action_name=action_name,
paasta_cluster=action_config.get("env", {}).get("PAASTA_CLUSTER"),
paasta_service=action_config.get("env", {}).get("PAASTA_SERVICE"),
paasta_instance=action_config.get("env", {}).get("PAASTA_INSTANCE"),
team=action_config.get("monitoring", {}).get("team"),
page=action_config.get("monitoring", {}).get("page"),
)


def _is_spark_action(action_config: Dict[str, Any]) -> bool:
return action_config.get(
"executor"
) == "spark" or "paasta spark-run" in action_config.get("command", "")


def validate_complete_config(
service: str, cluster: str, soa_dir: str = DEFAULT_SOA_DIR
) -> List[str]:
Expand Down

0 comments on commit f947381

Please sign in to comment.