Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job for checking sync status and resyncing if needed #107

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.cardanofoundation.ledgersync.explorerconsumer.configuration;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface BlockEventHandling {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.cardanofoundation.ledgersync.explorerconsumer.configuration;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.cardanofoundation.ledgersync.explorerconsumer.service.BlockSyncingManager;
import org.springframework.stereotype.Component;

@Aspect
@Component
@Slf4j
@RequiredArgsConstructor
public class BlockSyncingAspect {
private final BlockSyncingManager blockSyncingManager;

@Before("@annotation(org.cardanofoundation.ledgersync.explorerconsumer.configuration.BlockEventHandling)")
public void beforeBlockEventHandling(JoinPoint joinPoint) {
blockSyncingManager.setIsEventBeingProcessed(true);
}

@After("@annotation(org.cardanofoundation.ledgersync.explorerconsumer.configuration.BlockEventHandling)")
public void afterBlockEventHandling(JoinPoint joinPoint) {
blockSyncingManager.setLastEventProcessedTime(System.currentTimeMillis());
blockSyncingManager.setIsEventBeingProcessed(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.cardanofoundation.ledgersync.explorerconsumer.aggregate.AggregatedBlock;
import org.cardanofoundation.ledgersync.explorerconsumer.configuration.BlockEventHandling;
import org.cardanofoundation.ledgersync.explorerconsumer.repository.BlockRepository;
import org.cardanofoundation.ledgersync.explorerconsumer.service.*;
import org.cardanofoundation.ledgersync.explorerconsumer.service.impl.block.BlockAggregatorServiceImpl;
Expand Down Expand Up @@ -34,14 +35,14 @@ public class BlockEventListener {

private final BlockRepository blockRepository;
private final MetricCollectorService metricCollectorService;
private final BlockSyncingManager blockSyncingManager;

private final AtomicInteger blockCount = new AtomicInteger(0);

@Value("${blocks.batch-size}")
private Integer batchSize;
@Value("${blocks.commitThreshold}")
private Long commitThreshold;

private final AtomicLong lastMessageReceivedTime = new AtomicLong(System.currentTimeMillis());
private AtomicLong blockHeight;
private long lastLog;

Expand All @@ -54,34 +55,44 @@ private void initBlockHeight() {

@EventListener
@Transactional
@BlockEventHandling
public void handleBlockEvent(BlockEvent blockEvent) {
if (checkIfBlockExists(blockEvent.getMetadata())) return;

if (checkIfBlockExists(blockEvent.getMetadata())) {
return;
}
AggregatedBlock aggregatedBlock = blockAggregatorService.aggregateBlock(blockEvent);
handleAggregateBlock(blockEvent.getMetadata(), aggregatedBlock);
}

@EventListener
@Transactional
@BlockEventHandling
public void handleByronBlockEvent(ByronMainBlockEvent byronMainBlockEvent) {
if (checkIfBlockExists(byronMainBlockEvent.getMetadata())) return;
if (checkIfBlockExists(byronMainBlockEvent.getMetadata())) {
return;
}

AggregatedBlock aggregatedBlock = byronMainAggregatorService.aggregateBlock(byronMainBlockEvent);
handleAggregateBlock(byronMainBlockEvent.getMetadata(), aggregatedBlock);
}

@EventListener
@Transactional
@BlockEventHandling
public void handleByronEbBlock(ByronEbBlockEvent byronEbBlockEvent) {
if (checkIfBlockExists(byronEbBlockEvent.getMetadata())) return;
if (checkIfBlockExists(byronEbBlockEvent.getMetadata())) {
return;
}

AggregatedBlock aggregatedBlock = byronEbbAggregatorService.aggregateBlock(byronEbBlockEvent);
handleAggregateBlock(byronEbBlockEvent.getMetadata(), aggregatedBlock);
}

@EventListener
@Transactional
@BlockEventHandling
public void handleGenesisBlock(GenesisBlockEvent genesisBlockEvent) {
blockSyncingManager.setLastEventReceivedTime(System.currentTimeMillis());
log.info("BlockEventListener.handleGenesisBlock");
String genesisHash = genesisBlockEvent.getBlockHash();
if (genesisHash != null && genesisHash.startsWith("Genesis")) {
Expand All @@ -94,7 +105,10 @@ public void handleGenesisBlock(GenesisBlockEvent genesisBlockEvent) {

@EventListener
@Transactional
@BlockEventHandling
public void handleRollback(RollbackEvent rollbackEvent) {
blockSyncingManager.setLastEventReceivedTime(System.currentTimeMillis());

Long rollbackBlockNo = blockRepository.findBySlotNo(rollbackEvent.getRollbackTo().getSlot())
.map(block -> block.getBlockNo())
.orElse(0L);
Expand All @@ -114,7 +128,7 @@ public void handleRollback(RollbackEvent rollbackEvent) {
private boolean checkIfBlockExists(EventMetadata metadata) {
var optional = blockRepository.findBlockByHash(metadata.getBlockHash());
if (optional.isPresent()) {
log.info("Block already exists. Skipping block no {}, hash {}", metadata.getEpochSlot(),
log.info("Block already exists. Skipping block no {}, hash {}", metadata.getBlock(),
metadata.getBlockHash());
return true;
}
Expand All @@ -124,7 +138,12 @@ private boolean checkIfBlockExists(EventMetadata metadata) {
private void handleAggregateBlock(EventMetadata eventMetadata, AggregatedBlock aggregatedBlock) {
try {
long currentTime = System.currentTimeMillis();
long lastReceivedTimeElapsed = currentTime - lastMessageReceivedTime.getAndSet(currentTime);
long lastReceivedTimeElapsed = currentTime - blockSyncingManager.getLastEventReceivedTime();
blockSyncingManager.setLastEventReceivedTime(currentTime);

if (eventMetadata.isSyncMode()) {
blockSyncingManager.setIsSyncMode(true);
}

if (currentTime - lastLog >= 500) {//reduce log
log.info("Block number {}, slot_no {}, hash {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.cardanofoundation.ledgersync.explorerconsumer.service;

import com.bloxbean.cardano.yaci.core.protocol.chainsync.messages.Point;
import com.bloxbean.cardano.yaci.store.core.StoreProperties;
import com.bloxbean.cardano.yaci.store.core.domain.Cursor;
import com.bloxbean.cardano.yaci.store.core.service.BlockFetchService;
import com.bloxbean.cardano.yaci.store.core.service.CursorService;
import com.bloxbean.cardano.yaci.store.core.service.TipFinderService;
import com.bloxbean.cardano.yaci.store.events.RollbackEvent;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Component
@RequiredArgsConstructor
@ConditionalOnProperty(
value = "blocks.jobs.check-sync.enabled",
matchIfMissing = true,
havingValue = "true")
@Slf4j
public class BlockSyncCheckScheduler {
private final BlockFetchService blockFetchService;
private final CursorService cursorService;
private final TipFinderService tipFinderService;
private final StoreProperties storeProperties;
private final BlockSyncingManager blockSyncingManager;
private final ApplicationEventPublisher publisher;

@Value("${blocks.resyncThreshold.notInSyncMode}")
private Long resyncThresholdNotInSyncMode;
@Value("${blocks.resyncThreshold.inSyncMode}")
private Long resyncThresholdInSyncMode;

/**
* Scheduled method to check and perform a manual resynchronization if needed.
* The scheduler checks the synchronization status and initiates a manual
* resynchronization if the conditions for resync are met. This includes closing
* the current connection to the node and establishing a new connection
*/
@Scheduled(fixedDelayString = "${blocks.jobs.check-sync.fixed-delay}",
initialDelayString = "${blocks.jobs.check-sync.init-delay}")
@SneakyThrows
public void checkAndPerformResyncIfNeeded() {
log.debug("Check sync status");
long lastProcessedTimeElapsed = System.currentTimeMillis() - blockSyncingManager.getLastEventProcessedTime();
long reconnectThreshold = blockSyncingManager.getIsSyncMode() ? resyncThresholdInSyncMode : resyncThresholdNotInSyncMode;

if (lastProcessedTimeElapsed < reconnectThreshold || blockSyncingManager.getIsEventBeingProcessed()) {
return;
}

Cursor cursor = cursorService.getCursor().orElse(null);
if (Objects.isNull(cursor)) {
return;
}

log.info("Shutdown and reconnect manually");

if (blockSyncingManager.getIsSyncMode()) {
blockFetchService.shutdownSync();
} else {
blockFetchService.shutdown();
}

Thread.sleep(4000);
// Double-checking to ensure that no event is being processed before initiating resynchronization
while (true) {
if (!blockSyncingManager.getIsEventBeingProcessed()) {
break;
}
}

Cursor newestCursor = cursorService.getCursor().orElse(null);
Point from = new Point(newestCursor.getSlot(), newestCursor.getBlockHash());

Point to = (storeProperties.getSyncStopSlot() != 0L) ?
new Point(storeProperties.getSyncStopSlot(), storeProperties.getSyncStopBlockhash()) :
tipFinderService.getTip().block().getPoint();

log.info("From >> " + from);
log.info("TO >> " + to);

//Send a rollback event to rollback data at and after this slot.
//This is because, during start up the from block will be processed again (For BlockFetch).
if (from.getSlot() > 0) {
RollbackEvent rollbackEvent = RollbackEvent.builder()
.rollbackTo(new Point(from.getSlot(), from.getHash()))
.build();
log.info("Publishing rollback event to clean data after restart >>> " + rollbackEvent);
publisher.publishEvent(rollbackEvent);
}

if (blockSyncingManager.getIsSyncMode()) {
blockFetchService.startSync(from);
} else {
blockFetchService.startFetch(from, to);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.cardanofoundation.ledgersync.explorerconsumer.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Manages synchronization state and timing information for block events or rollback events.
* This class is responsible for keeping track of various synchronization-related
* details, such as the last event received time, the last event processed time,
* synchronization mode status, and whether an event is currently being processed.
*/
@Component
@Slf4j
public class BlockSyncingManager {

/** The timestamp of the last received block event. */
private final AtomicLong lastEventReceivedTime = new AtomicLong(System.currentTimeMillis());

/**
* The timestamp of the last processed block event.
* Represents the moment when the processing of a block event is completed.
* It is updated each time a block event is successfully processed by the system.
*/
private final AtomicLong lastEventProcessedTime = new AtomicLong(System.currentTimeMillis());

/** Indicates whether the yaci is crawling with sync mode (use Chain-Sync protocol)*/
private final AtomicBoolean isSyncMode = new AtomicBoolean(false);

/** Indicates whether a block event is currently being processed. */
private final AtomicBoolean isEventBeingProcessed = new AtomicBoolean(false);

public void setIsEventBeingProcessed(boolean isProcessing) {
isEventBeingProcessed.set(isProcessing);
}

public void setIsSyncMode(boolean value) {
isSyncMode.set(value);
}

public void setLastEventReceivedTime(long time) {
lastEventReceivedTime.set(time);
}

public void setLastEventProcessedTime(long time) {
lastEventProcessedTime.set(time);
}

public boolean getIsSyncMode() {
return isSyncMode.get();
}

public boolean getIsEventBeingProcessed() {
return isEventBeingProcessed.get();
}

public long getLastEventReceivedTime() {
return lastEventReceivedTime.get();
}

public long getLastEventProcessedTime() {
return lastEventProcessedTime.get();
}

}
8 changes: 8 additions & 0 deletions application/src/main/resources/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ logging:
blocks:
batch-size: ${BLOCKS_BATCH_SIZE:100}
commitThreshold: ${COMMIT_THRESHOLD:3000}
resyncThreshold:
inSyncMode: ${RESYNC_THRESHOLD_SYNC_MODE:600000}
notInSyncMode: ${RESYNC_THRESHOLD_NOT_SYNC_MODE:150000}
jobs:
check-sync:
enabled: ${CHECK_SYNC_ENABLED:true}
fixed-delay: ${CHECK_SYNC_FIXED_DELAY:60000}
init-delay: ${CHECK_SYNC_INIT_DELAY:30000}

genesis:
byron: ${BYRON_GENESIS_URL:classpath:networks/${NETWORK}/byron-genesis.json}
Expand Down
Loading