Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix unloadNamespaceBundlesGracefully can be stuck with extensible load manager (#23349) #23496

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ public CompletableFuture<Void> closeAsync() {
return closeFuture;
}
LOG.info("Closing PulsarService");
if (topicPoliciesService != null) {
topicPoliciesService.close();
}
if (brokerService != null) {
brokerService.unloadNamespaceBundlesGracefully();
}
Expand Down Expand Up @@ -578,10 +581,6 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (topicPoliciesService != null) {
topicPoliciesService.close();
topicPoliciesService = null;
}

if (client != null) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,14 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private SplitManager splitManager;

private volatile boolean started = false;
enum State {
INIT,
RUNNING,
// It's removing visibility of the current broker from other brokers. In this state, it cannot play as a leader
// or follower.
DISABLED,
}
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);

private boolean configuredSystemTopics = false;

Expand Down Expand Up @@ -210,7 +217,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
* Get all the bundles that are owned by this broker.
*/
public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
if (!started) {
if (state.get() == State.INIT) {
log.warn("Failed to get owned service units, load manager is not started.");
return CompletableFuture.completedFuture(Collections.emptySet());
}
Expand Down Expand Up @@ -373,7 +380,7 @@ public static CompletableFuture<Optional<BrokerLookupData>> getAssignedBrokerLoo

@Override
public void start() throws PulsarServerException {
if (this.started) {
if (state.get() != State.INIT) {
return;
}
try {
Expand Down Expand Up @@ -471,7 +478,9 @@ public void start() throws PulsarServerException {

this.splitScheduler.start();
this.initWaiter.complete(true);
this.started = true;
if (!state.compareAndSet(State.INIT, State.RUNNING)) {
failForUnexpectedState("start");
}
log.info("Started load manager.");
} catch (Throwable e) {
failStarting(e);
Expand Down Expand Up @@ -643,21 +652,17 @@ public CompletableFuture<Optional<String>> selectAsync(ServiceUnitId bundle,
filter.filterAsync(availableBrokerCandidates, bundle, context);
futures.add(future);
}
CompletableFuture<Optional<String>> result = new CompletableFuture<>();
FutureUtil.waitForAll(futures).whenComplete((__, ex) -> {
if (ex != null) {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers when select bundle: {}", bundle, ex);
}
return FutureUtil.waitForAll(futures).exceptionally(e -> {
// TODO: We may need to revisit this error case.
log.error("Failed to filter out brokers when select bundle: {}", bundle, e);
return null;
}).thenApply(__ -> {
if (availableBrokerCandidates.isEmpty()) {
result.complete(Optional.empty());
return;
return Optional.empty();
}
Set<String> candidateBrokers = availableBrokerCandidates.keySet();

result.complete(getBrokerSelectionStrategy().select(candidateBrokers, bundle, context));
return getBrokerSelectionStrategy().select(candidateBrokers, bundle, context);
});
return result;
});
}

Expand Down Expand Up @@ -695,6 +700,9 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
boolean force,
long timeout,
TimeUnit timeoutUnit) {
if (state.get() == State.INIT) {
return CompletableFuture.completedFuture(null);
}
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -783,28 +791,13 @@ private CompletableFuture<Void> splitAsync(SplitDecision decision,

@Override
public void close() throws PulsarServerException {
if (!this.started) {
if (state.get() == State.INIT) {
return;
}
try {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}

if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}

if (monitorTask != null) {
monitorTask.cancel(true);
}

this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
stopLoadDataReportTasks();
this.unloadScheduler.close();
this.splitScheduler.close();
} catch (IOException ex) {
throw new PulsarServerException(ex);
} finally {
try {
this.brokerRegistry.close();
Expand All @@ -818,14 +811,36 @@ public void close() throws PulsarServerException {
} catch (Exception e) {
throw new PulsarServerException(e);
} finally {
this.started = false;
state.set(State.INIT);
}
}

}
}
}

private void stopLoadDataReportTasks() {
if (brokerLoadDataReportTask != null) {
brokerLoadDataReportTask.cancel(true);
}
if (topBundlesLoadDataReportTask != null) {
topBundlesLoadDataReportTask.cancel(true);
}
if (monitorTask != null) {
monitorTask.cancel(true);
}
try {
brokerLoadDataStore.shutdown();
} catch (IOException e) {
log.warn("Failed to shutdown brokerLoadDataStore", e);
}
try {
topBundlesLoadDataStore.shutdown();
} catch (IOException e) {
log.warn("Failed to shutdown topBundlesLoadDataStore", e);
}
}

public static boolean isInternalTopic(String topic) {
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
Expand All @@ -841,13 +856,16 @@ synchronized void playLeader() {
boolean becameFollower = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!initWaiter.get()) {
if (!initWaiter.get() || disabled()) {
return;
}
if (!serviceUnitStateChannel.isChannelOwner()) {
becameFollower = true;
break;
}
if (disabled()) {
return;
}
// Confirm the system topics have been created or create them if they do not exist.
// If the leader has changed, the new leader need to reset
// the local brokerService.topics (by this topic creations).
Expand All @@ -859,6 +877,11 @@ synchronized void playLeader() {
serviceUnitStateChannel.scheduleOwnershipMonitor();
break;
} catch (Throwable e) {
if (disabled()) {
log.warn("The broker:{} failed to set the role but exit because it's disabled",
pulsar.getBrokerId(), e);
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -870,6 +893,9 @@ synchronized void playLeader() {
}
}
}
if (disabled()) {
return;
}

if (becameFollower) {
log.warn("The broker:{} became follower while initializing leader role.", pulsar.getBrokerId());
Expand All @@ -893,13 +919,16 @@ synchronized void playFollower() {
boolean becameLeader = false;
while (!Thread.currentThread().isInterrupted()) {
try {
if (!initWaiter.get()) {
if (!initWaiter.get() || disabled()) {
return;
}
if (serviceUnitStateChannel.isChannelOwner()) {
becameLeader = true;
break;
}
if (disabled()) {
return;
}
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
closeInternalTopics();
Expand All @@ -908,6 +937,11 @@ synchronized void playFollower() {
topBundlesLoadDataStore.startProducer();
break;
} catch (Throwable e) {
if (disabled()) {
log.warn("The broker:{} failed to set the role but exit because it's disabled",
pulsar.getBrokerId(), e);
return;
}
log.warn("The broker:{} failed to set the role. Retrying {} th ...",
pulsar.getBrokerId(), ++retry, e);
try {
Expand All @@ -919,6 +953,9 @@ synchronized void playFollower() {
}
}
}
if (disabled()) {
return;
}

if (becameLeader) {
log.warn("This broker:{} became leader while initializing follower role.", pulsar.getBrokerId());
Expand Down Expand Up @@ -997,9 +1034,20 @@ protected void monitor() {
}

public void disableBroker() throws Exception {
// TopicDoesNotExistException might be thrown and it's not recoverable. Enable this flag to exit playFollower()
// or playLeader() quickly.
if (!state.compareAndSet(State.RUNNING, State.DISABLED)) {
failForUnexpectedState("disableBroker");
}
stopLoadDataReportTasks();
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
leaderElectionService.close();
final var availableBrokers = brokerRegistry.getAvailableBrokersAsync()
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
if (availableBrokers.isEmpty()) {
close();
}
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
Expand Down Expand Up @@ -1033,4 +1081,16 @@ protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) {
protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) {
return new ServiceUnitStateChannelImpl(pulsar);
}

private void failForUnexpectedState(String msg) {
throw new IllegalStateException("Failed to " + msg + ", state: " + state.get());
}

boolean running() {
return state.get() == State.RUNNING;
}

private boolean disabled() {
return state.get() == State.DISABLED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public void start() throws PulsarServerException {
loadManager.start();
}

public boolean started() {
return loadManager.running() && loadManager.getServiceUnitStateChannel().started();
}

@Override
public void initialize(PulsarService pulsar) {
loadManager.initialize(pulsar);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
void start() throws PulsarServerException;

/**
* Whether the channel started.
*/
boolean started();

/**
* Closes the ServiceUnitStateChannel.
* @throws PulsarServerException if it fails to close the channel.
Expand Down
Loading
Loading