From f2646498fb4484e34bb167f4dd91fbe178ddad4f Mon Sep 17 00:00:00 2001 From: Sotatek-HuyLe3a Date: Fri, 22 Dec 2023 14:26:33 +0700 Subject: [PATCH 1/2] feat: #105 add job for checking sync status and resyncing if needed --- .../configuration/BlockEventHandling.java | 12 ++ .../configuration/BlockSyncingAspect.java | 29 +++++ .../listeners/BlockEventListener.java | 35 ++++-- .../service/BlockSyncCheckScheduler.java | 106 ++++++++++++++++++ .../service/BlockSyncingManager.java | 67 +++++++++++ 5 files changed, 241 insertions(+), 8 deletions(-) create mode 100644 application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockEventHandling.java create mode 100644 application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockSyncingAspect.java create mode 100644 application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncCheckScheduler.java create mode 100644 application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncingManager.java diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockEventHandling.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockEventHandling.java new file mode 100644 index 00000000..36b9817a --- /dev/null +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockEventHandling.java @@ -0,0 +1,12 @@ +package org.cardanofoundation.ledgersync.explorerconsumer.configuration; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface BlockEventHandling { + +} diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockSyncingAspect.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockSyncingAspect.java new file mode 100644 index 00000000..143b8b68 --- /dev/null +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/configuration/BlockSyncingAspect.java @@ -0,0 +1,29 @@ +package org.cardanofoundation.ledgersync.explorerconsumer.configuration; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.JoinPoint; +import org.aspectj.lang.annotation.After; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Before; +import org.cardanofoundation.ledgersync.explorerconsumer.service.BlockSyncingManager; +import org.springframework.stereotype.Component; + +@Aspect +@Component +@Slf4j +@RequiredArgsConstructor +public class BlockSyncingAspect { + private final BlockSyncingManager blockSyncingManager; + + @Before("@annotation(org.cardanofoundation.ledgersync.explorerconsumer.configuration.BlockEventHandling)") + public void beforeBlockEventHandling(JoinPoint joinPoint) { + blockSyncingManager.setIsEventBeingProcessed(true); + } + + @After("@annotation(org.cardanofoundation.ledgersync.explorerconsumer.configuration.BlockEventHandling)") + public void afterBlockEventHandling(JoinPoint joinPoint) { + blockSyncingManager.setLastEventProcessedTime(System.currentTimeMillis()); + blockSyncingManager.setIsEventBeingProcessed(false); + } +} diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java index 9ad4a3b3..189e22dc 100644 --- a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/listeners/BlockEventListener.java @@ -5,6 +5,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.cardanofoundation.ledgersync.explorerconsumer.aggregate.AggregatedBlock; +import org.cardanofoundation.ledgersync.explorerconsumer.configuration.BlockEventHandling; import org.cardanofoundation.ledgersync.explorerconsumer.repository.BlockRepository; import org.cardanofoundation.ledgersync.explorerconsumer.service.*; import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.BlockAggregatorServiceImpl; @@ -34,14 +35,14 @@ public class BlockEventListener { private final BlockRepository blockRepository; private final MetricCollectorService metricCollectorService; + private final BlockSyncingManager blockSyncingManager; + private final AtomicInteger blockCount = new AtomicInteger(0); @Value("${blocks.batch-size}") private Integer batchSize; @Value("${blocks.commitThreshold}") private Long commitThreshold; - - private final AtomicLong lastMessageReceivedTime = new AtomicLong(System.currentTimeMillis()); private AtomicLong blockHeight; private long lastLog; @@ -54,17 +55,22 @@ private void initBlockHeight() { @EventListener @Transactional + @BlockEventHandling public void handleBlockEvent(BlockEvent blockEvent) { - if (checkIfBlockExists(blockEvent.getMetadata())) return; - + if (checkIfBlockExists(blockEvent.getMetadata())) { + return; + } AggregatedBlock aggregatedBlock = blockAggregatorService.aggregateBlock(blockEvent); handleAggregateBlock(blockEvent.getMetadata(), aggregatedBlock); } @EventListener @Transactional + @BlockEventHandling public void handleByronBlockEvent(ByronMainBlockEvent byronMainBlockEvent) { - if (checkIfBlockExists(byronMainBlockEvent.getMetadata())) return; + if (checkIfBlockExists(byronMainBlockEvent.getMetadata())) { + return; + } AggregatedBlock aggregatedBlock = byronMainAggregatorService.aggregateBlock(byronMainBlockEvent); handleAggregateBlock(byronMainBlockEvent.getMetadata(), aggregatedBlock); @@ -72,8 +78,11 @@ public void handleByronBlockEvent(ByronMainBlockEvent byronMainBlockEvent) { @EventListener @Transactional + @BlockEventHandling public void handleByronEbBlock(ByronEbBlockEvent byronEbBlockEvent) { - if (checkIfBlockExists(byronEbBlockEvent.getMetadata())) return; + if (checkIfBlockExists(byronEbBlockEvent.getMetadata())) { + return; + } AggregatedBlock aggregatedBlock = byronEbbAggregatorService.aggregateBlock(byronEbBlockEvent); handleAggregateBlock(byronEbBlockEvent.getMetadata(), aggregatedBlock); @@ -81,7 +90,9 @@ public void handleByronEbBlock(ByronEbBlockEvent byronEbBlockEvent) { @EventListener @Transactional + @BlockEventHandling public void handleGenesisBlock(GenesisBlockEvent genesisBlockEvent) { + blockSyncingManager.setLastEventReceivedTime(System.currentTimeMillis()); log.info("BlockEventListener.handleGenesisBlock"); String genesisHash = genesisBlockEvent.getBlockHash(); if (genesisHash != null && genesisHash.startsWith("Genesis")) { @@ -94,7 +105,10 @@ public void handleGenesisBlock(GenesisBlockEvent genesisBlockEvent) { @EventListener @Transactional + @BlockEventHandling public void handleRollback(RollbackEvent rollbackEvent) { + blockSyncingManager.setLastEventReceivedTime(System.currentTimeMillis()); + Long rollbackBlockNo = blockRepository.findBySlotNo(rollbackEvent.getRollbackTo().getSlot()) .map(block -> block.getBlockNo()) .orElse(0L); @@ -114,7 +128,7 @@ public void handleRollback(RollbackEvent rollbackEvent) { private boolean checkIfBlockExists(EventMetadata metadata) { var optional = blockRepository.findBlockByHash(metadata.getBlockHash()); if (optional.isPresent()) { - log.info("Block already exists. Skipping block no {}, hash {}", metadata.getEpochSlot(), + log.info("Block already exists. Skipping block no {}, hash {}", metadata.getBlock(), metadata.getBlockHash()); return true; } @@ -124,7 +138,12 @@ private boolean checkIfBlockExists(EventMetadata metadata) { private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock aggregatedBlock) { try { long currentTime = System.currentTimeMillis(); - long lastReceivedTimeElapsed = currentTime - lastMessageReceivedTime.getAndSet(currentTime); + long lastReceivedTimeElapsed = currentTime - blockSyncingManager.getLastEventReceivedTime(); + blockSyncingManager.setLastEventReceivedTime(currentTime); + + if (eventMetadata.isSyncMode()) { + blockSyncingManager.setIsSyncMode(true); + } if (currentTime - lastLog >= 500) {//reduce log log.info("Block number {}, slot_no {}, hash {}", diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncCheckScheduler.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncCheckScheduler.java new file mode 100644 index 00000000..989f9631 --- /dev/null +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncCheckScheduler.java @@ -0,0 +1,106 @@ +package org.cardanofoundation.ledgersync.explorerconsumer.service; + +import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Point; +import com.bloxbean.cardano.yaci.store.core.StoreProperties; +import com.bloxbean.cardano.yaci.store.core.domain.Cursor; +import com.bloxbean.cardano.yaci.store.core.service.BlockFetchService; +import com.bloxbean.cardano.yaci.store.core.service.CursorService; +import com.bloxbean.cardano.yaci.store.core.service.TipFinderService; +import com.bloxbean.cardano.yaci.store.events.RollbackEvent; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +@Component +@RequiredArgsConstructor +@ConditionalOnProperty( + value = "blocks.jobs.check-sync.enabled", + matchIfMissing = true, + havingValue = "true") +@Slf4j +public class BlockSyncCheckScheduler { + private final BlockFetchService blockFetchService; + private final CursorService cursorService; + private final TipFinderService tipFinderService; + private final StoreProperties storeProperties; + private final BlockSyncingManager blockSyncingManager; + private final ApplicationEventPublisher publisher; + + @Value("${blocks.resyncThreshold.notInSyncMode}") + private Long resyncThresholdNotInSyncMode; + @Value("${blocks.resyncThreshold.inSyncMode}") + private Long resyncThresholdInSyncMode; + + /** + * Scheduled method to check and perform a manual resynchronization if needed. + * The scheduler checks the synchronization status and initiates a manual + * resynchronization if the conditions for resync are met. This includes closing + * the current connection to the node and establishing a new connection + */ + @Scheduled(fixedDelayString = "${blocks.jobs.check-sync.fixed-delay}", + initialDelayString = "${blocks.jobs.check-sync.init-delay}") + @SneakyThrows + public void checkAndPerformResyncIfNeeded() { + log.debug("Check sync status"); + long lastProcessedTimeElapsed = System.currentTimeMillis() - blockSyncingManager.getLastEventProcessedTime(); + long reconnectThreshold = blockSyncingManager.getIsSyncMode() ? resyncThresholdInSyncMode : resyncThresholdNotInSyncMode; + + if (lastProcessedTimeElapsed < reconnectThreshold || blockSyncingManager.getIsEventBeingProcessed()) { + return; + } + + Cursor cursor = cursorService.getCursor().orElse(null); + if (Objects.isNull(cursor)) { + return; + } + + log.info("Shutdown and reconnect manually"); + + if (blockSyncingManager.getIsSyncMode()) { + blockFetchService.shutdownSync(); + } else { + blockFetchService.shutdown(); + } + + Thread.sleep(4000); + // Double-checking to ensure that no event is being processed before initiating resynchronization + while (true) { + if (!blockSyncingManager.getIsEventBeingProcessed()) { + break; + } + } + + Cursor newestCursor = cursorService.getCursor().orElse(null); + Point from = new Point(newestCursor.getSlot(), newestCursor.getBlockHash()); + + Point to = (storeProperties.getSyncStopSlot() != 0L) ? + new Point(storeProperties.getSyncStopSlot(), storeProperties.getSyncStopBlockhash()) : + tipFinderService.getTip().block().getPoint(); + + log.info("From >> " + from); + log.info("TO >> " + to); + + //Send a rollback event to rollback data at and after this slot. + //This is because, during start up the from block will be processed again (For BlockFetch). + if (from.getSlot() > 0) { + RollbackEvent rollbackEvent = RollbackEvent.builder() + .rollbackTo(new Point(from.getSlot(), from.getHash())) + .build(); + log.info("Publishing rollback event to clean data after restart >>> " + rollbackEvent); + publisher.publishEvent(rollbackEvent); + } + + if (blockSyncingManager.getIsSyncMode()) { + blockFetchService.startSync(from); + } else { + blockFetchService.startFetch(from, to); + } + } +} diff --git a/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncingManager.java b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncingManager.java new file mode 100644 index 00000000..77fa65d2 --- /dev/null +++ b/application/src/main/java/org/cardanofoundation/ledgersync/explorerconsumer/service/BlockSyncingManager.java @@ -0,0 +1,67 @@ +package org.cardanofoundation.ledgersync.explorerconsumer.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Manages synchronization state and timing information for block events or rollback events. + * This class is responsible for keeping track of various synchronization-related + * details, such as the last event received time, the last event processed time, + * synchronization mode status, and whether an event is currently being processed. + */ +@Component +@Slf4j +public class BlockSyncingManager { + + /** The timestamp of the last received block event. */ + private final AtomicLong lastEventReceivedTime = new AtomicLong(System.currentTimeMillis()); + + /** + * The timestamp of the last processed block event. + * Represents the moment when the processing of a block event is completed. + * It is updated each time a block event is successfully processed by the system. + */ + private final AtomicLong lastEventProcessedTime = new AtomicLong(System.currentTimeMillis()); + + /** Indicates whether the yaci is crawling with sync mode (use Chain-Sync protocol)*/ + private final AtomicBoolean isSyncMode = new AtomicBoolean(false); + + /** Indicates whether a block event is currently being processed. */ + private final AtomicBoolean isEventBeingProcessed = new AtomicBoolean(false); + + public void setIsEventBeingProcessed(boolean isProcessing) { + isEventBeingProcessed.set(isProcessing); + } + + public void setIsSyncMode(boolean value) { + isSyncMode.set(value); + } + + public void setLastEventReceivedTime(long time) { + lastEventReceivedTime.set(time); + } + + public void setLastEventProcessedTime(long time) { + lastEventProcessedTime.set(time); + } + + public boolean getIsSyncMode() { + return isSyncMode.get(); + } + + public boolean getIsEventBeingProcessed() { + return isEventBeingProcessed.get(); + } + + public long getLastEventReceivedTime() { + return lastEventReceivedTime.get(); + } + + public long getLastEventProcessedTime() { + return lastEventProcessedTime.get(); + } + +} From c91be75f4167fee89738eba2568673de38a68138 Mon Sep 17 00:00:00 2001 From: Sotatek-HuyLe3a Date: Thu, 28 Dec 2023 16:43:36 +0700 Subject: [PATCH 2/2] fix: update application.yaml --- application/src/main/resources/config/application.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/application/src/main/resources/config/application.yaml b/application/src/main/resources/config/application.yaml index b8fbf702..3eea9fe4 100755 --- a/application/src/main/resources/config/application.yaml +++ b/application/src/main/resources/config/application.yaml @@ -68,6 +68,14 @@ logging: blocks: batch-size: ${BLOCKS_BATCH_SIZE:100} commitThreshold: ${COMMIT_THRESHOLD:3000} + resyncThreshold: + inSyncMode: ${RESYNC_THRESHOLD_SYNC_MODE:600000} + notInSyncMode: ${RESYNC_THRESHOLD_NOT_SYNC_MODE:150000} + jobs: + check-sync: + enabled: ${CHECK_SYNC_ENABLED:true} + fixed-delay: ${CHECK_SYNC_FIXED_DELAY:60000} + init-delay: ${CHECK_SYNC_INIT_DELAY:30000} genesis: byron: ${BYRON_GENESIS_URL:classpath:networks/${NETWORK}/byron-genesis.json}