@@ -181,8 +181,8 @@ func NewAddIndexIngestPipeline(
181
181
srcOp := NewTableScanTaskSource (ctx , store , tbl , startKey , endKey , backendCtx )
182
182
scanOp := NewTableScanOperator (ctx , sessPool , copCtx , srcChkPool , readerCnt ,
183
183
reorgMeta .GetBatchSize (), rm , backendCtx )
184
- ingestOp := NewIndexIngestOperator (ctx , copCtx , backendCtx , sessPool ,
185
- tbl , indexes , engines , srcChkPool , writerCnt , reorgMeta , rowCntListener )
184
+ ingestOp := NewIndexIngestOperator (ctx , copCtx , sessPool ,
185
+ tbl , indexes , engines , srcChkPool , writerCnt , reorgMeta )
186
186
sinkOp := newIndexWriteResultSink (ctx , backendCtx , tbl , indexes , rowCntListener )
187
187
188
188
operator .Compose [TableScanTask ](srcOp , scanOp )
@@ -667,19 +667,17 @@ func NewWriteExternalStoreOperator(
667
667
writers = append (writers , writer )
668
668
}
669
669
670
- return & indexIngestExternalWorker {
671
- indexIngestBaseWorker : indexIngestBaseWorker {
672
- ctx : ctx ,
673
- tbl : tbl ,
674
- indexes : indexes ,
675
- copCtx : copCtx ,
676
- se : nil ,
677
- sessPool : sessPool ,
678
- writers : writers ,
679
- srcChunkPool : srcChunkPool ,
680
- reorgMeta : reorgMeta ,
681
- totalCount : totalCount ,
682
- },
670
+ return & indexIngestWorker {
671
+ ctx : ctx ,
672
+ tbl : tbl ,
673
+ indexes : indexes ,
674
+ copCtx : copCtx ,
675
+ se : nil ,
676
+ sessPool : sessPool ,
677
+ writers : writers ,
678
+ srcChunkPool : srcChunkPool ,
679
+ reorgMeta : reorgMeta ,
680
+ totalCount : totalCount ,
683
681
}
684
682
})
685
683
return & WriteExternalStoreOperator {
@@ -700,7 +698,6 @@ func (o *WriteExternalStoreOperator) Close() error {
700
698
type IndexWriteResult struct {
701
699
ID int
702
700
Added int
703
- Total int
704
701
}
705
702
706
703
// IndexIngestOperator writes index records to ingest engine.
@@ -712,15 +709,13 @@ type IndexIngestOperator struct {
712
709
func NewIndexIngestOperator (
713
710
ctx * OperatorCtx ,
714
711
copCtx copr.CopContext ,
715
- backendCtx ingest.BackendCtx ,
716
712
sessPool opSessPool ,
717
713
tbl table.PhysicalTable ,
718
714
indexes []table.Index ,
719
715
engines []ingest.Engine ,
720
716
srcChunkPool * sync.Pool ,
721
717
concurrency int ,
722
718
reorgMeta * model.DDLReorgMeta ,
723
- rowCntListener RowCountListener ,
724
719
) * IndexIngestOperator {
725
720
writerCfg := getLocalWriterConfig (len (indexes ), concurrency )
726
721
@@ -742,77 +737,25 @@ func NewIndexIngestOperator(
742
737
writers = append (writers , writer )
743
738
}
744
739
745
- return & indexIngestLocalWorker {
746
- indexIngestBaseWorker : indexIngestBaseWorker {
747
- ctx : ctx ,
748
- tbl : tbl ,
749
- indexes : indexes ,
750
- copCtx : copCtx ,
751
-
752
- se : nil ,
753
- sessPool : sessPool ,
754
- writers : writers ,
755
- srcChunkPool : srcChunkPool ,
756
- reorgMeta : reorgMeta ,
757
- },
758
- backendCtx : backendCtx ,
759
- rowCntListener : rowCntListener ,
740
+ return & indexIngestWorker {
741
+ ctx : ctx ,
742
+ tbl : tbl ,
743
+ indexes : indexes ,
744
+ copCtx : copCtx ,
745
+
746
+ se : nil ,
747
+ sessPool : sessPool ,
748
+ writers : writers ,
749
+ srcChunkPool : srcChunkPool ,
750
+ reorgMeta : reorgMeta ,
760
751
}
761
752
})
762
753
return & IndexIngestOperator {
763
754
AsyncOperator : operator .NewAsyncOperator [IndexRecordChunk , IndexWriteResult ](ctx , pool ),
764
755
}
765
756
}
766
757
767
- type indexIngestExternalWorker struct {
768
- indexIngestBaseWorker
769
- }
770
-
771
- func (w * indexIngestExternalWorker ) HandleTask (ck IndexRecordChunk , send func (IndexWriteResult )) {
772
- defer func () {
773
- if ck .Chunk != nil {
774
- w .srcChunkPool .Put (ck .Chunk )
775
- }
776
- }()
777
- rs , err := w .indexIngestBaseWorker .HandleTask (ck )
778
- if err != nil {
779
- w .ctx .onError (err )
780
- return
781
- }
782
- send (rs )
783
- }
784
-
785
- type indexIngestLocalWorker struct {
786
- indexIngestBaseWorker
787
- backendCtx ingest.BackendCtx
788
- rowCntListener RowCountListener
789
- }
790
-
791
- func (w * indexIngestLocalWorker ) HandleTask (ck IndexRecordChunk , send func (IndexWriteResult )) {
792
- defer func () {
793
- if ck .Chunk != nil {
794
- w .srcChunkPool .Put (ck .Chunk )
795
- }
796
- }()
797
- rs , err := w .indexIngestBaseWorker .HandleTask (ck )
798
- if err != nil {
799
- w .ctx .onError (err )
800
- return
801
- }
802
- if rs .Added == 0 {
803
- return
804
- }
805
- w .rowCntListener .Written (rs .Added )
806
- err = w .backendCtx .IngestIfQuotaExceeded (w .ctx , ck .ID , rs .Added )
807
- if err != nil {
808
- w .ctx .onError (err )
809
- return
810
- }
811
- rs .Total = w .backendCtx .TotalKeyCount ()
812
- send (rs )
813
- }
814
-
815
- type indexIngestBaseWorker struct {
758
+ type indexIngestWorker struct {
816
759
ctx * OperatorCtx
817
760
818
761
tbl table.PhysicalTable
@@ -830,23 +773,28 @@ type indexIngestBaseWorker struct {
830
773
totalCount * atomic.Int64
831
774
}
832
775
833
- func (w * indexIngestBaseWorker ) HandleTask (rs IndexRecordChunk ) (IndexWriteResult , error ) {
776
+ func (w * indexIngestWorker ) HandleTask (ck IndexRecordChunk , send func (IndexWriteResult )) {
777
+ defer func () {
778
+ if ck .Chunk != nil {
779
+ w .srcChunkPool .Put (ck .Chunk )
780
+ }
781
+ }()
834
782
failpoint .Inject ("injectPanicForIndexIngest" , func () {
835
783
panic ("mock panic" )
836
784
})
837
785
838
786
result := IndexWriteResult {
839
- ID : rs .ID ,
787
+ ID : ck .ID ,
840
788
}
841
789
w .initSessCtx ()
842
- count , _ , err := w .WriteChunk (& rs )
790
+ count , _ , err := w .WriteChunk (& ck )
843
791
if err != nil {
844
792
w .ctx .onError (err )
845
- return result , err
793
+ return
846
794
}
847
795
if count == 0 {
848
- logutil .Logger (w .ctx ).Info ("finish a index ingest task" , zap .Int ("id" , rs .ID ))
849
- return result , nil
796
+ logutil .Logger (w .ctx ).Info ("finish a index ingest task" , zap .Int ("id" , ck .ID ))
797
+ return
850
798
}
851
799
if w .totalCount != nil {
852
800
w .totalCount .Add (int64 (count ))
@@ -855,10 +803,10 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul
855
803
if ResultCounterForTest != nil {
856
804
ResultCounterForTest .Add (1 )
857
805
}
858
- return result , nil
806
+ send ( result )
859
807
}
860
808
861
- func (w * indexIngestBaseWorker ) initSessCtx () {
809
+ func (w * indexIngestWorker ) initSessCtx () {
862
810
if w .se == nil {
863
811
sessCtx , err := w .sessPool .Get ()
864
812
if err != nil {
@@ -874,7 +822,7 @@ func (w *indexIngestBaseWorker) initSessCtx() {
874
822
}
875
823
}
876
824
877
- func (w * indexIngestBaseWorker ) Close () {
825
+ func (w * indexIngestWorker ) Close () {
878
826
// TODO(lance6716): unify the real write action for engineInfo and external
879
827
// writer.
880
828
for _ , writer := range w .writers {
@@ -894,7 +842,7 @@ func (w *indexIngestBaseWorker) Close() {
894
842
}
895
843
896
844
// WriteChunk will write index records to lightning engine.
897
- func (w * indexIngestBaseWorker ) WriteChunk (rs * IndexRecordChunk ) (count int , nextKey kv.Key , err error ) {
845
+ func (w * indexIngestWorker ) WriteChunk (rs * IndexRecordChunk ) (count int , nextKey kv.Key , err error ) {
898
846
failpoint .Inject ("mockWriteLocalError" , func (_ failpoint.Value ) {
899
847
failpoint .Return (0 , nil , errors .New ("mock write local error" ))
900
848
})
@@ -955,20 +903,28 @@ func (s *indexWriteResultSink) collectResult() error {
955
903
select {
956
904
case <- s .ctx .Done ():
957
905
return s .ctx .Err ()
958
- case _ , ok := <- s .source .Channel ():
906
+ case rs , ok := <- s .source .Channel ():
959
907
if ! ok {
960
908
err := s .flush ()
961
909
if err != nil {
962
910
s .ctx .onError (err )
963
911
}
964
- if s .backendCtx != nil {
912
+ if s .backendCtx != nil { // for local sort only
965
913
total := s .backendCtx .TotalKeyCount ()
966
914
if total > 0 {
967
915
s .rowCntListener .SetTotal (total )
968
916
}
969
917
}
970
918
return err
971
919
}
920
+ s .rowCntListener .Written (rs .Added )
921
+ if s .backendCtx != nil { // for local sort only
922
+ err := s .backendCtx .IngestIfQuotaExceeded (s .ctx , rs .ID , rs .Added )
923
+ if err != nil {
924
+ s .ctx .onError (err )
925
+ return err
926
+ }
927
+ }
972
928
}
973
929
}
974
930
}
0 commit comments