Skip to content

Commit

Permalink
Merge branch 'main' into CCFIX
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacAttack committed Oct 7, 2024
2 parents 7d913fc + 5889f63 commit aa57991
Show file tree
Hide file tree
Showing 56 changed files with 1,845 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -446,7 +447,8 @@ private synchronized void bootstrap() {
storageMetadataService,
ingestionService,
storageService,
blobTransferManager)
blobTransferManager,
this::getVeniceCurrentVersionNumber)
: new DefaultIngestionBackend(
storageMetadataService,
ingestionService,
Expand Down Expand Up @@ -656,6 +658,11 @@ Version getVeniceCurrentVersion(String storeName) {
}
}

int getVeniceCurrentVersionNumber(String storeName) {
Version currentVersion = getVeniceCurrentVersion(storeName);
return currentVersion == null ? -1 : currentVersion.getNumber();
}

private Version getVeniceLatestNonFaultyVersion(Store store, Set<Integer> faultyVersions) {
Version latestNonFaultyVersion = null;
for (Version version: store.getVersions()) {
Expand Down Expand Up @@ -818,4 +825,30 @@ static ExecutionStatus getDaVinciErrorStatus(Exception e, boolean useDaVinciSpec
}
return status;
}

public boolean hasCurrentVersionBootstrapping() {
return ingestionBackend.hasCurrentVersionBootstrapping();
}

static class BootstrappingAwareCompletableFuture {
private ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("DaVinci_Bootstrapping_Check_Executor"));
public final CompletableFuture<Void> bootstrappingFuture = new CompletableFuture<>();

public BootstrappingAwareCompletableFuture(DaVinciBackend backend) {
scheduledExecutor.scheduleAtFixedRate(() -> {
if (bootstrappingFuture.isDone()) {
return;
}
if (!backend.hasCurrentVersionBootstrapping()) {
bootstrappingFuture.complete(null);
}
}, 0, 3, TimeUnit.SECONDS);
bootstrappingFuture.whenComplete((ignored1, ignored2) -> scheduledExecutor.shutdown());
}

public CompletableFuture<Void> getBootstrappingFuture() {
return bootstrappingFuture;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,30 @@ synchronized void tryStartHeartbeat() {
if (isReportingPushStatus() && heartbeat == null) {
heartbeat = backend.getExecutor().scheduleAtFixedRate(() -> {
try {
backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName());
sendOutHeartbeat(backend, version);
} catch (Throwable t) {
LOGGER.error("Unable to send heartbeat for {}", this);
}
}, 0, heartbeatInterval, TimeUnit.SECONDS);
}
}

protected static void sendOutHeartbeat(DaVinciBackend backend, Version version) {
if (backend.hasCurrentVersionBootstrapping()) {
LOGGER.info(
"DaVinci still is still bootstrapping, so it will send heart-beat message with a special timestamp"
+ " for store: {} to avoid delaying the new push job",
version.getStoreName());
/**
* Tell backend that the report from the bootstrapping instance doesn't count to avoid
* delaying new pushes.
*/
backend.getPushStatusStoreWriter().writeHeartbeatForBootstrappingInstance(version.getStoreName());
} else {
backend.getPushStatusStoreWriter().writeHeartbeat(version.getStoreName());
}
}

synchronized void tryStopHeartbeat() {
if (heartbeat != null && partitionFutures.values().stream().allMatch(CompletableFuture::isDone)) {
heartbeat.cancel(true);
Expand Down Expand Up @@ -359,9 +375,40 @@ synchronized CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions
futures.add(partitionFutures.get(partition));
}

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
CompletableFuture<Void> bootstrappingAwareSubscriptionFuture = new CompletableFuture<>();

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((v, e) -> {
storeBackendStats.recordSubscribeDuration(Duration.between(startTime, Instant.now()));
if (e != null) {
bootstrappingAwareSubscriptionFuture.completeExceptionally(e);
LOGGER.warn("Bootstrapping store: {}, version: {} failed", version.getStoreName(), version.getNumber(), e);
} else {
LOGGER.info("Bootstrapping store: {}, version: {} is completed", version.getStoreName(), version.getNumber());
/**
* It is important to start polling the bootstrapping status after the version ingestion is completed to
* make sure the bootstrapping status polling is valid (not doing polling without any past/active ingestion tasks).
*/
new DaVinciBackend.BootstrappingAwareCompletableFuture(backend).getBootstrappingFuture()
.whenComplete((ignored, ee) -> {
if (ee != null) {
bootstrappingAwareSubscriptionFuture.completeExceptionally(ee);
LOGGER.warn(
"Bootstrapping aware subscription to store: {}, version: {} failed",
version.getStoreName(),
version.getNumber(),
ee);
} else {
bootstrappingAwareSubscriptionFuture.complete(null);
LOGGER.info(
"Bootstrapping aware subscription to store: {}, version: {} is completed",
version.getStoreName(),
version.getNumber());
}
});
}
});

return bootstrappingAwareSubscriptionFuture;
}

synchronized void unsubscribe(ComplementSet<Integer> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_ENABLE_LIVE_CONFIG_BASED_KAFKA_THROTTLING;
import static com.linkedin.venice.ConfigKeys.SERVER_ENABLE_PARALLEL_BATCH_GET;
import static com.linkedin.venice.ConfigKeys.SERVER_FORKED_PROCESS_JVM_ARGUMENT_LIST;
import static com.linkedin.venice.ConfigKeys.SERVER_GLOBAL_RT_DIV_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_HEADER_TABLE_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INBOUND_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_INITIAL_WINDOW_SIZE;
Expand Down Expand Up @@ -542,6 +543,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int channelOptionWriteBufferHighBytes;
private final boolean aaWCWorkloadParallelProcessingEnabled;
private final int aaWCWorkloadParallelProcessingThreadPoolSize;
private final boolean isGlobalRtDivEnabled;

public VeniceServerConfig(VeniceProperties serverProperties) throws ConfigurationException {
this(serverProperties, Collections.emptyMap());
Expand Down Expand Up @@ -897,9 +899,12 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getInt(SERVER_NON_CURRENT_VERSION_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND, -1);
nonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond =
serverProperties.getInt(SERVER_NON_CURRENT_VERSION_NON_AA_WC_LEADER_QUOTA_RECORDS_PER_SECOND, -1);

// default 64KB
channelOptionWriteBufferHighBytes = (int) serverProperties
.getSizeInBytes(SERVER_CHANNEL_OPTION_WRITE_BUFFER_WATERMARK_HIGH_BYTES, WriteBufferWaterMark.DEFAULT.high()); // default
// 64KB
.getSizeInBytes(SERVER_CHANNEL_OPTION_WRITE_BUFFER_WATERMARK_HIGH_BYTES, WriteBufferWaterMark.DEFAULT.high());

this.isGlobalRtDivEnabled = serverProperties.getBoolean(SERVER_GLOBAL_RT_DIV_ENABLED, false);
if (channelOptionWriteBufferHighBytes <= 0) {
throw new VeniceException("Invalid channel option write buffer high bytes: " + channelOptionWriteBufferHighBytes);
}
Expand Down Expand Up @@ -1624,4 +1629,8 @@ public boolean isAAWCWorkloadParallelProcessingEnabled() {
public int getAAWCWorkloadParallelProcessingThreadPoolSize() {
return aaWCWorkloadParallelProcessingThreadPoolSize;
}

public boolean isGlobalRtDivEnabled() {
return isGlobalRtDivEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ public void setStorageEngineReference(
}
}

@Override
public boolean hasCurrentVersionBootstrapping() {
return getStoreIngestionService().hasCurrentVersionBootstrapping();
}

@Override
public KafkaStoreIngestionService getStoreIngestionService() {
return storeIngestionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ void dropStoragePartitionGracefully(

// setStorageEngineReference is used by Da Vinci exclusively to speed up storage engine retrieval for read path.
void setStorageEngineReference(String topicName, AtomicReference<AbstractStorageEngine> storageEngineReference);

/**
* Check whether there are any current version bootstrapping or not.
*/
boolean hasCurrentVersionBootstrapping();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.davinci.ingestion.main.MainIngestionRequestClient;
import com.linkedin.davinci.ingestion.main.MainIngestionStorageMetadataService;
import com.linkedin.davinci.ingestion.main.MainPartitionIngestionStatus;
import com.linkedin.davinci.ingestion.main.MainTopicIngestionStatus;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.notifier.RelayNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
Expand All @@ -19,12 +20,15 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -50,6 +54,7 @@ public class IsolatedIngestionBackend extends DefaultIngestionBackend implements
private final MainIngestionMonitorService mainIngestionMonitorService;
private final VeniceConfigLoader configLoader;
private final ExecutorService completionReportHandlingExecutor = Executors.newFixedThreadPool(10);
private final Function<String, Integer> currentVersionSupplier;
private Process isolatedIngestionServiceProcess;

public IsolatedIngestionBackend(
Expand All @@ -58,7 +63,8 @@ public IsolatedIngestionBackend(
StorageMetadataService storageMetadataService,
KafkaStoreIngestionService storeIngestionService,
StorageService storageService,
BlobTransferManager blobTransferManager) {
BlobTransferManager blobTransferManager,
Function<String, Integer> currentVersionSupplier) {
super(
storageMetadataService,
storeIngestionService,
Expand All @@ -68,6 +74,7 @@ public IsolatedIngestionBackend(
int servicePort = configLoader.getVeniceServerConfig().getIngestionServicePort();
int listenerPort = configLoader.getVeniceServerConfig().getIngestionApplicationPort();
this.configLoader = configLoader;
this.currentVersionSupplier = currentVersionSupplier;
// Create the ingestion request client.
mainIngestionRequestClient = new MainIngestionRequestClient(configLoader);
// Create the forked isolated ingestion process.
Expand Down Expand Up @@ -192,6 +199,10 @@ public MainIngestionMonitorService getMainIngestionMonitorService() {
return mainIngestionMonitorService;
}

Function<String, Integer> getCurrentVersionSupplier() {
return currentVersionSupplier;
}

public MainIngestionRequestClient getMainIngestionRequestClient() {
return mainIngestionRequestClient;
}
Expand All @@ -218,6 +229,31 @@ public void close() {
}
}

public boolean hasCurrentVersionBootstrapping() {
if (super.hasCurrentVersionBootstrapping()) {
return true;
}

Map<String, MainTopicIngestionStatus> topicIngestionStatusMap =
getMainIngestionMonitorService().getTopicIngestionStatusMap();
for (Map.Entry<String, MainTopicIngestionStatus> entry: topicIngestionStatusMap.entrySet()) {
String topicName = entry.getKey();
MainTopicIngestionStatus ingestionStatus = entry.getValue();
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
int version = Version.parseVersionFromKafkaTopicName(topicName);
/**
* If the current version is still being ingested by isolated process, it means the bootstrapping hasn't finished
* yet as the ingestion task should be handled over to main process if all partitions complete ingestion.
*/
if (getCurrentVersionSupplier().apply(storeName) == version
&& ingestionStatus.hasPartitionIngestingInIsolatedProcess()) {
return true;
}
}

return false;
}

boolean isTopicPartitionHostedInMainProcess(String topicName, int partition) {
return getMainIngestionMonitorService().getTopicPartitionIngestionStatus(topicName, partition)
.equals(MainPartitionIngestionStatus.MAIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,13 @@ public long getIngestingPartitionCount() {
public String getTopicName() {
return topicName;
}

public boolean hasPartitionIngestingInIsolatedProcess() {
for (Map.Entry<Integer, MainPartitionIngestionStatus> entry: ingestionStatusMap.entrySet()) {
if (entry.getValue().equals(MainPartitionIngestionStatus.ISOLATED)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ void setDataReceiver(
// Defensive coding. Should never happen except in case of a regression.
throw new IllegalStateException(
"It is not allowed to set multiple " + ConsumedDataReceiver.class.getSimpleName() + " instances for the same "
+ "topic-partition of a given consumer. Previous: " + previousConsumedDataReceiver + ", New: "
+ consumedDataReceiver);
+ "topic-partition of a given consumer. Previous: " + previousConsumedDataReceiver.destinationIdentifier()
+ ", New: " + consumedDataReceiver.destinationIdentifier());
}
synchronized (this) {
notifyAll();
Expand Down
Loading

0 comments on commit aa57991

Please sign in to comment.