@@ -802,10 +802,7 @@ def build_data_response(
802
802
self , initial_result_record : InitialResultRecord , data : dict [str , Any ] | None
803
803
) -> ExecutionResult | ExperimentalIncrementalExecutionResults :
804
804
"""Build response for the given data."""
805
- for child in initial_result_record .children :
806
- if child .filtered :
807
- continue
808
- self ._publish (child )
805
+ pending_sources = self ._publish (initial_result_record .children )
809
806
810
807
errors = initial_result_record .errors or None
811
808
if errors :
@@ -816,14 +813,7 @@ def build_data_response(
816
813
error .message ,
817
814
)
818
815
)
819
- pending = self ._pending
820
- if pending :
821
- pending_sources : RefSet [DeferredFragmentRecord | StreamRecord ] = RefSet (
822
- subsequent_result_record .stream_record
823
- if isinstance (subsequent_result_record , StreamItemsRecord )
824
- else subsequent_result_record
825
- for subsequent_result_record in pending
826
- )
816
+ if pending_sources :
827
817
return ExperimentalIncrementalExecutionResults (
828
818
initial_result = InitialIncrementalExecutionResult (
829
819
data ,
@@ -994,17 +984,7 @@ def _process_pending(
994
984
completed_results : list [CompletedResult ] = []
995
985
to_result = self ._completed_record_to_result
996
986
for subsequent_result_record in completed_records :
997
- for child in subsequent_result_record .children :
998
- if child .filtered :
999
- continue
1000
- pending_source : DeferredFragmentRecord | StreamRecord = (
1001
- child .stream_record
1002
- if isinstance (child , StreamItemsRecord )
1003
- else child
1004
- )
1005
- if not pending_source .pending_sent :
1006
- new_pending_sources .add (pending_source )
1007
- self ._publish (child )
987
+ self ._publish (subsequent_result_record .children , new_pending_sources )
1008
988
incremental_result : IncrementalResult
1009
989
if isinstance (subsequent_result_record , StreamItemsRecord ):
1010
990
if subsequent_result_record .is_final_record :
@@ -1060,7 +1040,7 @@ def _get_incremental_defer_result(
1060
1040
max_length : int | None = None
1061
1041
id_with_longest_path : str | None = None
1062
1042
for fragment_record in fragment_records :
1063
- if fragment_record .id is None :
1043
+ if fragment_record .id is None : # pragma: no cover
1064
1044
continue
1065
1045
length = len (fragment_record .path )
1066
1046
if max_length is None or length > max_length :
@@ -1090,20 +1070,45 @@ def _completed_record_to_result(
1090
1070
completed_record .errors or None ,
1091
1071
)
1092
1072
1093
- def _publish (self , subsequent_result_record : SubsequentResultRecord ) -> None :
1094
- """Publish the given incremental data record."""
1095
- if isinstance (subsequent_result_record , StreamItemsRecord ):
1096
- if subsequent_result_record .is_completed :
1097
- self ._push (subsequent_result_record )
1098
- else :
1073
+ def _publish (
1074
+ self ,
1075
+ subsequent_result_records : dict [SubsequentResultRecord , None ],
1076
+ pending_sources : RefSet [DeferredFragmentRecord | StreamRecord ] | None = None ,
1077
+ ) -> RefSet [DeferredFragmentRecord | StreamRecord ]:
1078
+ """Publish the given set of incremental data record."""
1079
+ if pending_sources is None :
1080
+ pending_sources = RefSet ()
1081
+ empty_records : list [SubsequentResultRecord ] = []
1082
+
1083
+ for subsequent_result_record in subsequent_result_records :
1084
+ if subsequent_result_record .filtered :
1085
+ continue
1086
+ if isinstance (subsequent_result_record , StreamItemsRecord ):
1087
+ if subsequent_result_record .is_completed :
1088
+ self ._push (subsequent_result_record )
1089
+ else :
1090
+ self ._introduce (subsequent_result_record )
1091
+
1092
+ stream = subsequent_result_record .stream_record
1093
+ if not stream .pending_sent :
1094
+ pending_sources .add (stream )
1095
+ continue
1096
+
1097
+ if subsequent_result_record ._pending : # noqa: SLF001
1099
1098
self ._introduce (subsequent_result_record )
1100
- elif subsequent_result_record ._pending : # noqa: SLF001
1101
- self ._introduce (subsequent_result_record )
1102
- elif (
1103
- subsequent_result_record .deferred_grouped_field_set_records
1104
- or subsequent_result_record .children
1105
- ):
1106
- self ._push (subsequent_result_record )
1099
+ elif not subsequent_result_record .deferred_grouped_field_set_records :
1100
+ empty_records .append (subsequent_result_record )
1101
+ continue
1102
+ else :
1103
+ self ._push (subsequent_result_record )
1104
+
1105
+ if not subsequent_result_record .pending_sent : # pragma: no cover else
1106
+ pending_sources .add (subsequent_result_record )
1107
+
1108
+ for empty_record in empty_records :
1109
+ self ._publish (empty_record .children , pending_sources )
1110
+
1111
+ return pending_sources
1107
1112
1108
1113
@staticmethod
1109
1114
def _get_children (
0 commit comments