@@ -574,17 +574,20 @@ namespace
574
574
return true ;
575
575
}
576
576
577
+ enum ActionType { REPLICATE, REPLAY, FAST_FORWARD };
578
+
577
579
void replicate (Target* target,
578
580
TransactionList& transactions,
579
581
FB_UINT64 sequence, ULONG offset,
580
582
ULONG length, const UCHAR* data,
581
- bool rewind )
583
+ ActionType action )
582
584
{
583
585
const Block* const header = (Block*) data;
584
586
585
587
const auto traNumber = header->traNumber ;
586
588
587
- if (!rewind || !traNumber || transactions.exist (traNumber))
589
+ if (action == REPLICATE ||
590
+ (action == REPLAY && (!traNumber || transactions.exist (traNumber))))
588
591
{
589
592
target->replicate (sequence, offset, length, data);
590
593
}
@@ -597,7 +600,7 @@ namespace
597
600
if (transactions.find (traNumber, pos))
598
601
transactions.remove (pos);
599
602
}
600
- else if (!rewind )
603
+ else if (action != REPLAY )
601
604
{
602
605
transactions.clear ();
603
606
}
@@ -606,7 +609,7 @@ namespace
606
609
{
607
610
fb_assert (traNumber);
608
611
609
- if (!rewind && !transactions.exist (traNumber))
612
+ if (action != REPLAY && !transactions.exist (traNumber))
610
613
transactions.add (ActiveTransaction (traNumber, sequence));
611
614
}
612
615
}
@@ -737,6 +740,7 @@ namespace
737
740
const FB_UINT64 max_sequence = queue.back ()->header .hdr_sequence ;
738
741
FB_UINT64 next_sequence = 0 ;
739
742
const bool restart = target->isShutdown ();
743
+ auto action = REPLICATE;
740
744
741
745
for (auto segment : queue)
742
746
{
@@ -754,21 +758,48 @@ namespace
754
758
const FB_UINT64 db_sequence = target->initReplica ();
755
759
const FB_UINT64 last_db_sequence = control.getDbSequence ();
756
760
757
- if (sequence <= db_sequence)
758
- {
759
- target->verbose (" Deleting segment %" UQUADFORMAT " due to fast forward" , sequence);
760
- segment->remove ();
761
- continue ;
762
- }
763
-
764
761
if (db_sequence != last_db_sequence)
765
762
{
766
- target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT, db_sequence + 1 );
767
- control.saveDbSequence (db_sequence);
768
- transactions.clear ();
769
- control.saveComplete (db_sequence, transactions);
770
- last_sequence = db_sequence;
771
- last_offset = 0 ;
763
+ if (sequence == db_sequence + 1 )
764
+ {
765
+ if (const auto oldest = findOldest (transactions))
766
+ {
767
+ const TraNumber oldest_trans = oldest->tra_id ;
768
+ const FB_UINT64 oldest_sequence = oldest ? oldest->sequence : 0 ;
769
+ target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT
770
+ " (new OAT: %" UQUADFORMAT " in segment %" UQUADFORMAT " )" ,
771
+ db_sequence + 1 , oldest_trans, oldest_sequence);
772
+ }
773
+ else
774
+ {
775
+ target->verbose (" Resetting replication to continue from segment %" UQUADFORMAT,
776
+ db_sequence + 1 );
777
+ }
778
+
779
+ control.saveDbSequence (db_sequence);
780
+ return PROCESS_SHUTDOWN; // this enforces restart from OAT
781
+ }
782
+
783
+ if (action != FAST_FORWARD)
784
+ {
785
+ if (segment != queue.front ())
786
+ {
787
+ fb_assert (false );
788
+ return PROCESS_SHUTDOWN;
789
+ }
790
+
791
+ if (db_sequence > max_sequence)
792
+ {
793
+ target->verbose (" Database sequence has been changed to %" UQUADFORMAT
794
+ " , waiting for appropriate segment" , db_sequence);
795
+ return PROCESS_SUSPEND;
796
+ }
797
+
798
+ target->verbose (" Database sequence has been changed to %" UQUADFORMAT
799
+ " , preparing for replication reset" , db_sequence);
800
+
801
+ action = FAST_FORWARD;
802
+ }
772
803
}
773
804
774
805
// If no new segments appeared since our last attempt,
@@ -843,17 +874,18 @@ namespace
843
874
844
875
if (blockLength)
845
876
{
846
- const bool rewind = (sequence < last_sequence ||
877
+ const bool replay = (sequence < last_sequence ||
847
878
(sequence == last_sequence && (!last_offset || totalLength < last_offset)));
879
+ if (action != FAST_FORWARD)
880
+ action = replay ? REPLAY : REPLICATE;
848
881
849
882
UCHAR* const data = buffer.getBuffer (length);
850
883
memcpy (data, &header, sizeof (Block));
851
884
852
885
if (read (file, data + sizeof (Block), blockLength) != blockLength)
853
886
raiseError (" Journal file %s read failed (error %d)" , segment->filename .c_str (), ERRNO);
854
887
855
- replicate (target, transactions, sequence, totalLength,
856
- length, data, rewind);
888
+ replicate (target, transactions, sequence, totalLength, length, data, action);
857
889
}
858
890
859
891
totalLength += length;
@@ -872,7 +904,10 @@ namespace
872
904
oldest_sequence = oldest ? oldest->sequence : 0 ;
873
905
next_sequence = sequence + 1 ;
874
906
875
- string extra;
907
+ string actionName, extra;
908
+ actionName = (action == FAST_FORWARD) ? " scanned" :
909
+ (action == REPLAY) ? " replayed" : " replicated" ;
910
+
876
911
if (oldest)
877
912
{
878
913
const TraNumber oldest_trans = oldest->tra_id ;
@@ -884,8 +919,8 @@ namespace
884
919
extra = " deleting" ;
885
920
}
886
921
887
- target->verbose (" Segment %" UQUADFORMAT " (%u bytes) is replicated in %s, %s" ,
888
- sequence, totalLength, interval.c_str (), extra.c_str ());
922
+ target->verbose (" Segment %" UQUADFORMAT " (%u bytes) is %s in %s, %s" ,
923
+ sequence, totalLength, actionName. c_str (), interval.c_str (), extra.c_str ());
889
924
890
925
if (!oldest_sequence)
891
926
segment->remove ();
@@ -907,8 +942,8 @@ namespace
907
942
break ;
908
943
909
944
target->verbose (" Deleting segment %" UQUADFORMAT " as no longer needed" , sequence);
910
-
911
945
segment->remove ();
946
+
912
947
} while (pos < queue.getCount ());
913
948
}
914
949
}
0 commit comments