Skip to content

Commit 3f9d14c

Browse files
committed
no-op
PiperOrigin-RevId: 660551866
1 parent 236ac38 commit 3f9d14c

File tree

3 files changed

+44
-9
lines changed

3 files changed

+44
-9
lines changed

tfx/orchestration/experimental/core/async_pipeline_task_gen.py

+2
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,8 @@ def _generate_tasks_for_node(
490490
execution_type=node.node_info.type,
491491
contexts=resolved_info.contexts,
492492
input_and_params=unprocessed_inputs,
493+
pipeline=self._pipeline,
494+
node_id=node.node_info.id,
493495
)
494496

495497
for execution in executions:

tfx/orchestration/experimental/core/sync_pipeline_task_gen.py

+2
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,8 @@ def _generate_tasks_from_resolved_inputs(
564564
execution_type=node.node_info.type,
565565
contexts=resolved_info.contexts,
566566
input_and_params=resolved_info.input_and_params,
567+
pipeline=self._pipeline,
568+
node_id=node.node_info.id,
567569
)
568570

569571
result.extend(

tfx/orchestration/experimental/core/task_gen_utils.py

+40-9
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from tfx.orchestration import metadata
3131
from tfx.orchestration import node_proto_view
3232
from tfx.orchestration.experimental.core import constants
33+
from tfx.orchestration.experimental.core import env
3334
from tfx.orchestration.experimental.core import mlmd_state
3435
from tfx.orchestration.experimental.core import task as task_lib
3536
from tfx.orchestration import mlmd_connection_manager as mlmd_cm
@@ -548,21 +549,35 @@ def register_executions_from_existing_executions(
548549
contexts = metadata_handle.store.get_contexts_by_execution(
549550
existing_executions[0].id
550551
)
551-
return execution_lib.put_executions(
552+
executions = execution_lib.put_executions(
552553
metadata_handle,
553554
new_executions,
554555
contexts,
555556
input_artifacts_maps=input_artifacts,
556557
)
557558

559+
pipeline_asset = metadata_handle.store.pipeline_asset
560+
if pipeline_asset:
561+
env.get_env().create_pipeline_run_node_executions(
562+
pipeline_asset.owner,
563+
pipeline_asset.name,
564+
pipeline,
565+
node.node_info.id,
566+
executions,
567+
)
568+
569+
return executions
570+
558571

559572
def register_executions(
560573
metadata_handle: metadata.Metadata,
561574
execution_type: metadata_store_pb2.ExecutionType,
562575
contexts: Sequence[metadata_store_pb2.Context],
563576
input_and_params: Sequence[InputAndParam],
577+
pipeline: Optional[pipeline_pb2.Pipeline] = None,
578+
node_id: Optional[str] = None,
564579
) -> Sequence[metadata_store_pb2.Execution]:
565-
"""Registers multiple executions in MLMD.
580+
"""Registers multiple executions in MLMD and Tflex backends.
566581
567582
Along with the execution:
568583
- the input artifacts will be linked to the executions.
@@ -575,6 +590,8 @@ def register_executions(
575590
input_and_params: A list of InputAndParams, which includes input_dicts
576591
(dictionaries of artifacts. One execution will be registered for each of
577592
the input_dict) and corresponding exec_properties.
593+
pipeline: Optional. The pipeline proto.
594+
node_id: Optional. The node id of the executions to be registered.
578595
579596
Returns:
580597
A list of MLMD executions that are registered in MLMD, with id populated.
@@ -603,21 +620,35 @@ def register_executions(
603620
executions.append(execution)
604621

605622
if len(executions) == 1:
606-
return [
623+
new_executions = [
607624
execution_lib.put_execution(
608625
metadata_handle,
609626
executions[0],
610627
contexts,
611628
input_artifacts=input_and_params[0].input_artifacts,
612629
)
613630
]
631+
else:
632+
new_executions = execution_lib.put_executions(
633+
metadata_handle,
634+
executions,
635+
contexts,
636+
[
637+
input_and_param.input_artifacts
638+
for input_and_param in input_and_params
639+
],
640+
)
614641

615-
return execution_lib.put_executions(
616-
metadata_handle,
617-
executions,
618-
contexts,
619-
[input_and_param.input_artifacts for input_and_param in input_and_params],
620-
)
642+
pipeline_asset = metadata_handle.store.pipeline_asset
643+
if pipeline_asset and pipeline and node_id:
644+
env.get_env().create_pipeline_run_node_executions(
645+
pipeline_asset.owner,
646+
pipeline_asset.name,
647+
pipeline,
648+
node_id,
649+
new_executions,
650+
)
651+
return new_executions
621652

622653

623654
def update_external_artifact_type(

0 commit comments

Comments
 (0)