diff --git a/providers/src/airflow/providers/google/cloud/links/bigquery.py b/providers/src/airflow/providers/google/cloud/links/bigquery.py index 3998a1c1f28be..8b3e95a29dea3 100644 --- a/providers/src/airflow/providers/google/cloud/links/bigquery.py +++ b/providers/src/airflow/providers/google/cloud/links/bigquery.py @@ -35,6 +35,9 @@ BIGQUERY_BASE_LINK + "?referrer=search&project={project_id}&d={dataset_id}&p={project_id}&page=table&t={table_id}" ) +BIGQUERY_JOB_DETAIL_LINK = ( + BIGQUERY_BASE_LINK + "?project={project_id}&ws=!1m5!1m4!1m3!1s{project_id}!2s{job_id}!3s{location}" +) class BigQueryDatasetLink(BaseGoogleLink): @@ -78,3 +81,25 @@ def persist( key=BigQueryTableLink.key, value={"dataset_id": dataset_id, "project_id": project_id, "table_id": table_id}, ) + + +class BigQueryJobDetailLink(BaseGoogleLink): + """Helper class for constructing BigQuery Job Detail Link.""" + + name = "BigQuery Job Detail" + key = "bigquery_job_detail" + format_str = BIGQUERY_JOB_DETAIL_LINK + + @staticmethod + def persist( + context: Context, + task_instance: BaseOperator, + project_id: str, + location: str, + job_id: str, + ): + task_instance.xcom_push( + context, + key=BigQueryJobDetailLink.key, + value={"project_id": project_id, "location": location, "job_id": job_id}, + ) diff --git a/providers/src/airflow/providers/google/cloud/operators/bigquery.py b/providers/src/airflow/providers/google/cloud/operators/bigquery.py index 044e2bf6ee808..43cf1a8f771ad 100644 --- a/providers/src/airflow/providers/google/cloud/operators/bigquery.py +++ b/providers/src/airflow/providers/google/cloud/operators/bigquery.py @@ -43,7 +43,11 @@ ) from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url -from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink +from airflow.providers.google.cloud.links.bigquery import ( + BigQueryDatasetLink, + BigQueryJobDetailLink, + BigQueryTableLink, +) from airflow.providers.google.cloud.openlineage.mixins import _BigQueryOpenLineageMixin from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.cloud.triggers.bigquery import ( @@ -2554,7 +2558,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix ) template_fields_renderers = {"configuration": "json", "configuration.query.query": "sql"} ui_color = BigQueryUIColors.QUERY.value - operator_extra_links = (BigQueryTableLink(),) + operator_extra_links = (BigQueryTableLink(), BigQueryJobDetailLink()) def __init__( self, @@ -2726,6 +2730,15 @@ def execute(self, context: Any): ) context["ti"].xcom_push(key="job_id_path", value=job_id_path) + persist_kwargs = { + "context": context, + "task_instance": self, + "project_id": self.project_id, + "location": self.location, + "job_id": self.job_id, + } + BigQueryJobDetailLink.persist(**persist_kwargs) + # Wait for the job to complete if not self.deferrable: job.result(timeout=self.result_timeout, retry=self.result_retry) diff --git a/providers/src/airflow/providers/google/provider.yaml b/providers/src/airflow/providers/google/provider.yaml index c333adb134fcf..b253967472d53 100644 --- a/providers/src/airflow/providers/google/provider.yaml +++ b/providers/src/airflow/providers/google/provider.yaml @@ -1200,6 +1200,7 @@ extra-links: - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink - airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink - airflow.providers.google.cloud.links.bigquery.BigQueryTableLink + - airflow.providers.google.cloud.links.bigquery.BigQueryJobDetailLink - airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink - airflow.providers.google.cloud.links.compute.ComputeInstanceDetailsLink - airflow.providers.google.cloud.links.compute.ComputeInstanceTemplateDetailsLink