Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix the bug that elected leader thinks it's a follower #364

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/actions/tune-runner-vm/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ runs:
# stop Azure Linux agent to save RAM
sudo systemctl stop walinuxagent.service || true

# Install cgroup-tools package to use cgget command
sudo apt-get update
sudo apt-get install -y cgroup-tools

# show memory
free -m
# show disk
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-cpp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
run: |
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: build package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-cli.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-function.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-messaging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-pulsar-io-ora.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-pulsar-io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-standalone.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-thread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-tiered-filesystem.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-tiered-jcloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-integration-transaction.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-owasp-dep-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
sudo swapoff -a
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

# Projects dependent on flume, hdfs, hbase, and presto currently excluded from the scan.
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-pulsar-website-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-shade-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
run: |
sudo rm -rf /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
docker rmi $(docker images -q) -f || echo "No Docker images to remove."
df -h

- name: run install by skip tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ private synchronized CompletableFuture<LeaderElectionState> handleExistingLeader
// If the value is the same as our proposed value, it means this instance was the leader at some
// point before. The existing value can either be for this same session or for a previous one.
if (res.getStat().isCreatedBySelf()) {
log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue,
path, res.getStat());
// The value is still valid because it was created in the same session
changeState(LeaderElectionState.Leading);
return CompletableFuture.completedFuture(LeaderElectionState.Leading);
} else {
// Since the value was created in a different session, it might be expiring. We need to delete it
// and try the election again.
Expand Down Expand Up @@ -259,7 +262,13 @@ public synchronized CompletableFuture<Void> asyncClose() {
return CompletableFuture.completedFuture(null);
}

return store.delete(path, version);
return store.delete(path, version)
.thenAccept(__ -> {
synchronized (LeaderElectionImpl.this) {
leaderElectionState = LeaderElectionState.NoLeader;
}
}
);
}

@Override
Expand All @@ -280,8 +289,8 @@ public Optional<T> getLeaderValueIfPresent() {
private void handleSessionNotification(SessionEvent event) {
// Ensure we're only processing one session event at a time.
executor.execute(SafeRunnable.safeRun(() -> {
if (event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}", path);
if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
log.info("Revalidating leadership for {}, event:{}", path, event);

try {
LeaderElectionState les = elect().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public void basicTest(String provider, Supplier<String> urlSupplier) throws Exce

leaderElection.close();

assertEquals(leaderElection.getState(), LeaderElectionState.NoLeader);

assertEquals(cache.get("/my/leader-election").join(), Optional.empty());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
Expand Down Expand Up @@ -180,4 +181,58 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
assertTrue(store.get(path).join().isPresent());
}


@Test
public void testElectAfterReconnected() throws Exception {
// --- init
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis(2_000)
.build());


BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
store.registerSessionListener(sessionEvents::add);
BlockingQueue<LeaderElectionState> leaderElectionEvents = new LinkedBlockingQueue<>();
String path = newKey();

@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LeaderElection<String> le1 = coordinationService.getLeaderElection(String.class, path,
leaderElectionEvents::add);

// --- test manual elect
String proposed = "value-1";
le1.elect(proposed).join();
assertEquals(le1.getState(), LeaderElectionState.Leading);
LeaderElectionState les = leaderElectionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(les, LeaderElectionState.Leading);


// simulate no leader state
FieldUtils.writeDeclaredField(le1, "leaderElectionState", LeaderElectionState.NoLeader, true);

// reconnect
zks.stop();

SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.ConnectionLost);

zks.start();


// --- test le1 can be leader
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.Reconnected);
Awaitility.await().atMost(Duration.ofSeconds(15))
.untilAsserted(()-> {
assertEquals(le1.getState(),LeaderElectionState.Leading);
}); // reacquire leadership


assertTrue(store.get(path).join().isPresent());
}
}
Loading