Skip to content

Commit

Permalink
Merge branch 'main' into CCFIX
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacAttack committed Oct 9, 2024
2 parents 6e9407b + 53adf35 commit 9f95576
Show file tree
Hide file tree
Showing 29 changed files with 599 additions and 429 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.12.2',
tehuti: 'io.tehuti:tehuti:0.12.3',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_CONCURRENT_STREAMS;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_FRAME_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_HTTP2_MAX_HEADER_LIST_SIZE;
import static com.linkedin.venice.ConfigKeys.SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_CHECKPOINT_DURING_GRACEFUL_SHUTDOWN_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_HEARTBEAT_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_ISOLATION_APPLICATION_PORT;
Expand Down Expand Up @@ -507,6 +508,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final int metaStoreWriterCloseConcurrency;

private final boolean batchReportEOIPEnabled;
private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode;
private final long ingestionHeartbeatIntervalMs;
private final boolean leaderCompleteStateCheckInFollowerEnabled;
private final long leaderCompleteStateCheckInFollowerValidIntervalMs;
Expand Down Expand Up @@ -834,7 +836,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverProperties.getLong(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, TimeUnit.MINUTES.toMillis(1));
batchReportEOIPEnabled =
serverProperties.getBoolean(SERVER_BATCH_REPORT_END_OF_INCREMENTAL_PUSH_STATUS_ENABLED, false);

incrementalPushStatusWriteMode =
extractIncPushStatusWriteMode(serverProperties.getString(SERVER_INCREMENTAL_PUSH_STATUS_WRITE_MODE, "DUAL"));
stuckConsumerRepairEnabled = serverProperties.getBoolean(SERVER_STUCK_CONSUMER_REPAIR_ENABLED, true);
stuckConsumerRepairIntervalSecond = serverProperties.getInt(SERVER_STUCK_CONSUMER_REPAIR_INTERVAL_SECOND, 60);
stuckConsumerDetectionRepairThresholdSecond =
Expand Down Expand Up @@ -1478,6 +1481,30 @@ public boolean getBatchReportEOIPEnabled() {
return batchReportEOIPEnabled;
}

public enum IncrementalPushStatusWriteMode {
/** Write incremental push status to Zookeeper only */
ZOOKEEPER_ONLY,

/** Write incremental push status to push status system store only */
PUSH_STATUS_SYSTEM_STORE_ONLY,

/** Write incremental push status to both Zookeeper and push status system store */
DUAL
}

public IncrementalPushStatusWriteMode extractIncPushStatusWriteMode(String mode) {
try {
return IncrementalPushStatusWriteMode.valueOf(mode);
} catch (IllegalArgumentException e) {
LOGGER.error("Invalid incremental push status write mode: {}. Defaulting to DUAL", mode);
return IncrementalPushStatusWriteMode.DUAL;
}
}

public IncrementalPushStatusWriteMode getIncrementalPushStatusWriteMode() {
return incrementalPushStatusWriteMode;
}

public boolean isLeaderCompleteStateCheckInFollowerEnabled() {
return leaderCompleteStateCheckInFollowerEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ private void asyncStart() {
partitionPushStatusAccessor,
statusStoreWriter,
helixReadOnlyStoreRepository,
instance.getNodeId());
instance.getNodeId(),
veniceServerConfig.getIncrementalPushStatusWriteMode());

ingestionBackend.getStoreIngestionService().addIngestionNotifier(pushStatusNotifier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ void setDataReceiver(
// Defensive coding. Should never happen except in case of a regression.
throw new IllegalStateException(
"It is not allowed to set multiple " + ConsumedDataReceiver.class.getSimpleName() + " instances for the same "
+ "topic-partition of a given consumer. Previous: " + previousConsumedDataReceiver.destinationIdentifier()
+ ", New: " + consumedDataReceiver.destinationIdentifier());
+ "topic-partition of a given consumer. Previous: " + previousConsumedDataReceiver + ", New: "
+ consumedDataReceiver);
}
synchronized (this) {
notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,8 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.forEach((topicPartition, sharedConsumer) -> {
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.unSubscribe(topicPartition);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
}
sharedConsumer.unSubscribe(topicPartition);
removeTopicPartitionFromConsumptionTask(sharedConsumer, topicPartition);
});
}
return null;
Expand All @@ -243,14 +237,8 @@ public void unsubscribeAll(PubSubTopic versionTopic) {
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) {
PubSubConsumerAdapter consumer = getConsumerAssignedToVersionTopicPartition(versionTopic, pubSubTopicPartition);
if (consumer != null) {
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (consumer) {
consumer.unSubscribe(pubSubTopicPartition);
removeTopicPartitionFromConsumptionTask(consumer, pubSubTopicPartition);
}
consumer.unSubscribe(pubSubTopicPartition);
consumerToConsumptionTask.get(consumer).removeDataReceiver(pubSubTopicPartition);
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(pubSubTopicPartition);
Expand All @@ -277,25 +265,20 @@ public void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition>
/**
* Leverage {@link PubSubConsumerAdapter#batchUnsubscribe(Set)}.
*/
consumerUnSubTopicPartitionSet.forEach((sharedConsumer, tpSet) -> {
ConsumptionTask task = consumerToConsumptionTask.get(sharedConsumer);
/**
* Refer {@link KafkaConsumerService#startConsumptionIntoDataReceiver} for avoiding race condition caused by
* setting data receiver and unsubscribing concurrently for the same topic partition on a shared consumer.
*/
synchronized (sharedConsumer) {
sharedConsumer.batchUnsubscribe(tpSet);
tpSet.forEach(task::removeDataReceiver);
}
tpSet.forEach(
tp -> versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(tp);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
}));
consumerUnSubTopicPartitionSet.forEach((c, tpSet) -> {
c.batchUnsubscribe(tpSet);
ConsumptionTask task = consumerToConsumptionTask.get(c);
tpSet.forEach(tp -> {
task.removeDataReceiver(tp);
versionTopicToTopicPartitionToConsumer.compute(versionTopic, (k, topicPartitionToConsumerMap) -> {
if (topicPartitionToConsumerMap != null) {
topicPartitionToConsumerMap.remove(tp);
return topicPartitionToConsumerMap.isEmpty() ? null : topicPartitionToConsumerMap;
} else {
return null;
}
});
});
});
}

Expand Down Expand Up @@ -404,32 +387,26 @@ public void startConsumptionIntoDataReceiver(
PubSubTopic versionTopic = consumedDataReceiver.destinationIdentifier();
PubSubTopicPartition topicPartition = partitionReplicaIngestionContext.getPubSubTopicPartition();
SharedKafkaConsumer consumer = assignConsumerFor(versionTopic, topicPartition);

if (consumer == null) {
// Defensive code. Shouldn't happen except in case of a regression.
throw new VeniceException(
"Shared consumer must exist for version topic: " + versionTopic + " in Kafka cluster: " + kafkaUrl);
}

ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
if (consumptionTask == null) {
// Defensive coding. Should never happen except in case of a regression.
throw new IllegalStateException(
"There should be a " + ConsumptionTask.class.getSimpleName() + " assigned for this "
+ SharedKafkaConsumer.class.getSimpleName());
}
/**
* It is possible that when one {@link StoreIngestionTask} thread finishes unsubscribing a topic partition but not
* finish removing data receiver, but the other {@link StoreIngestionTask} thread is setting data receiver for this
* topic partition before subscription. As {@link ConsumptionTask} does not allow 2 different data receivers for
* the same topic partition, it will throw exception.
* N.B. it's important to set the {@link ConsumedDataReceiver} prior to subscribing, otherwise the
* {@link KafkaConsumerService.ConsumptionTask} will not be able to funnel the messages.
*/
synchronized (consumer) {
ConsumptionTask consumptionTask = consumerToConsumptionTask.get(consumer);
if (consumptionTask == null) {
// Defensive coding. Should never happen except in case of a regression.
throw new IllegalStateException(
"There should be a " + ConsumptionTask.class.getSimpleName() + " assigned for this "
+ SharedKafkaConsumer.class.getSimpleName());
}
/**
* N.B. it's important to set the {@link ConsumedDataReceiver} prior to subscribing, otherwise the
* {@link KafkaConsumerService.ConsumptionTask} will not be able to funnel the messages.
*/
consumptionTask.setDataReceiver(topicPartition, consumedDataReceiver);
consumer.subscribe(consumedDataReceiver.destinationIdentifier(), topicPartition, lastReadOffset);
}
consumptionTask.setDataReceiver(topicPartition, consumedDataReceiver);
consumer.subscribe(consumedDataReceiver.destinationIdentifier(), topicPartition, lastReadOffset);
}

interface KCSConstructor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,8 @@ public void updatePartitionOffsetRecords(String topicName, int partition, ByteBu
LOGGER.error(
"Caught exception when deserializing offset record byte array: {} for replica: {}",
Arrays.toString(offsetRecordByteArray),
Utils.getReplicaId(topicName, partition));
Utils.getReplicaId(topicName, partition),
e);
throw e;
}
storageMetadataService.put(topicName, partition, offsetRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,4 @@ public Long endOffset(PubSubTopicPartition pubSubTopicPartition) {
public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic topic) {
throw new UnsupportedOperationException("partitionsFor is not supported in SharedKafkaConsumer");
}

// Test only
public void setNextPollTimeOutSeconds(long seconds) {
this.nextPollTimeOutSeconds = seconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.TOPIC_SWITCH_RECEIVED;

import com.linkedin.davinci.config.VeniceServerConfig.IncrementalPushStatusWriteMode;
import com.linkedin.venice.common.PushStatusStoreUtils;
import com.linkedin.venice.helix.HelixPartitionStatusAccessor;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
Expand Down Expand Up @@ -39,18 +40,21 @@ public class PushStatusNotifier implements VeniceNotifier {
private final PushStatusStoreWriter pushStatusStoreWriter;
private final ReadOnlyStoreRepository storeRepository;
private final String instanceId;
private final IncrementalPushStatusWriteMode incrementalPushStatusWriteMode;

public PushStatusNotifier(
OfflinePushAccessor offlinePushAccessor,
HelixPartitionStatusAccessor helixPartitionStatusAccessor,
PushStatusStoreWriter pushStatusStoreWriter,
ReadOnlyStoreRepository storeRepository,
String instanceId) {
String instanceId,
IncrementalPushStatusWriteMode incrementalPushStatusWriteMode) {
this.offLinePushAccessor = offlinePushAccessor;
this.helixPartitionStatusAccessor = helixPartitionStatusAccessor;
this.pushStatusStoreWriter = pushStatusStoreWriter;
this.storeRepository = storeRepository;
this.instanceId = instanceId;
this.incrementalPushStatusWriteMode = incrementalPushStatusWriteMode;
}

@Override
Expand Down Expand Up @@ -116,16 +120,28 @@ public void dataRecoveryCompleted(String kafkaTopic, int partitionId, long offse

@Override
public void startOfIncrementalPushReceived(String topic, int partitionId, long offset, String message) {
offLinePushAccessor
.updateReplicaStatus(topic, partitionId, instanceId, START_OF_INCREMENTAL_PUSH_RECEIVED, offset, message);
updateIncrementalPushStatusToPushStatusStore(topic, message, partitionId, START_OF_INCREMENTAL_PUSH_RECEIVED);
updateIncrementalPushStatus(topic, partitionId, offset, message, START_OF_INCREMENTAL_PUSH_RECEIVED);
}

@Override
public void endOfIncrementalPushReceived(String topic, int partitionId, long offset, String message) {
offLinePushAccessor
.updateReplicaStatus(topic, partitionId, instanceId, END_OF_INCREMENTAL_PUSH_RECEIVED, offset, message);
updateIncrementalPushStatusToPushStatusStore(topic, message, partitionId, END_OF_INCREMENTAL_PUSH_RECEIVED);
updateIncrementalPushStatus(topic, partitionId, offset, message, END_OF_INCREMENTAL_PUSH_RECEIVED);
}

private void updateIncrementalPushStatus(
String topic,
int partitionId,
long offset,
String message,
ExecutionStatus status) {
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
offLinePushAccessor.updateReplicaStatus(topic, partitionId, instanceId, status, offset, message);
}
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
updateIncrementalPushStatusToPushStatusStore(topic, message, partitionId, status);
}
}

@Override
Expand All @@ -135,15 +151,21 @@ public void batchEndOfIncrementalPushReceived(
long offset,
List<String> pendingReportIncPushVersionList) {

offLinePushAccessor
.batchUpdateReplicaIncPushStatus(topic, partitionId, instanceId, offset, pendingReportIncPushVersionList);
// We don't need to report redundant SOIP for these stale inc push versions as they've all received EOIP.
for (String incPushVersion: pendingReportIncPushVersionList) {
updateIncrementalPushStatusToPushStatusStore(
topic,
incPushVersion,
partitionId,
END_OF_INCREMENTAL_PUSH_RECEIVED);
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.ZOOKEEPER_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
offLinePushAccessor
.batchUpdateReplicaIncPushStatus(topic, partitionId, instanceId, offset, pendingReportIncPushVersionList);
}
if (incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.PUSH_STATUS_SYSTEM_STORE_ONLY
|| incrementalPushStatusWriteMode == IncrementalPushStatusWriteMode.DUAL) {
// We don't need to report redundant SOIP for these stale inc push versions as they've all received EOIP.
for (String incPushVersion: pendingReportIncPushVersionList) {
updateIncrementalPushStatusToPushStatusStore(
topic,
incPushVersion,
partitionId,
END_OF_INCREMENTAL_PUSH_RECEIVED);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ private void initServerStoreAndSchemaRepository() {
// Load existing store config and setup watches
storeRepo.refresh();

storeConfigRepo = new HelixReadOnlyStoreConfigRepository(
zkClient,
adapter,
clusterConfig.getRefreshAttemptsForZkReconnect(),
clusterConfig.getRefreshIntervalForZkReconnectInMs());
storeConfigRepo = new HelixReadOnlyStoreConfigRepository(zkClient, adapter);
storeConfigRepo.refresh();

readOnlyZKSharedSchemaRepository = new HelixReadOnlyZKSharedSchemaRepository(
Expand Down
Loading

0 comments on commit 9f95576

Please sign in to comment.