Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Reconnection logic and Backoff policy doesn't work correctly #1197

Merged
merged 18 commits into from
Sep 23, 2024

Conversation

crossoverJie
Copy link
Member

Fixes #1187

Modifications

  • Move backoff.go to the backoff directory (because there are circular dependencies, they are not moved to the pulsar directory.)
  • Create a new method for BackOffPolicy interface IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool

This is a breaking change that modifies the package name and interface name.

Package: internal->backoff
Interface name: BackoffPolicy-> Policy

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (GoDocs)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@crossoverJie
Copy link
Member Author

@RobertIndie PTAL

@RobertIndie RobertIndie self-requested a review March 21, 2024 14:24
@RobertIndie RobertIndie added this to the v0.13.0 milestone Mar 21, 2024
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that such breaking changes could significantly impact users. Users would have to implement the new methods introduced by the backoff policy, even if they haven't encountered this issue. Perhaps we could consider including this change in the 1.0 version.

I have initiated a discussion of the Go client 1.0.0: https://lists.apache.org/thread/h022q6nrk23tz2k0qpxjf6j8m1o4qsrt

@crossoverJie
Copy link
Member Author

Perhaps we could consider including this change in the 1.0 version.

Yes, I agree with you.

@nodece
Copy link
Member

nodece commented Jul 6, 2024

Any updates?

# Conflicts:
#	pulsar/consumer_partition.go
#	pulsar/internal/rpc_client.go
#	pulsar/producer_partition.go
@crossoverJie
Copy link
Member Author

crossoverJie commented Jul 7, 2024

@nodece The code conflicts have been resolved, but this PR involves changes to the public API. Do you recommend releasing it in version v0.13.0?

pulsar/backoff/backoff.go Show resolved Hide resolved
pulsar/backoff/backoff.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/consumer_partition.go Outdated Show resolved Hide resolved
pulsar/dlq_router.go Outdated Show resolved Hide resolved
pulsar/producer_partition.go Outdated Show resolved Hide resolved
@nodece
Copy link
Member

nodece commented Jul 7, 2024

This is a breaking change that modifies the package name and interface name.

A breaking change that we add new method to the Backoff interface, not change package name.

In the user project, the user cannot import the internal package of the pulsar library, this is a limitation, please check https://go.dev/doc/modules/layout

@crossoverJie
Copy link
Member Author

crossoverJie commented Jul 10, 2024

Package: internal->backoff
Interface name: BackoffPolicy-> Policy

Update:

BackoffPolicy internal.BackoffPolicy -> BackOffPolicyFunc func() backoff.Policy:

  1. Because the Policy is concurrently called in the consumer, it has been modified to obtain a policy object each time using the backoffPolicyFunc function.

} else if pc.options.backoffPolicy == nil {
delayReconnectTime = defaultBackoff.Next()
} else {
delayReconnectTime = pc.options.backoffPolicy.Next()

nextDelay := backoff.Next()
if nextDelay > remainTime {
nextDelay = remainTime

  1. A new Reset function has been added to the Policy interface, which will be called after a successful reconnection. If a user's custom Policy requires the reuse of the same object, the data can be reset within this Reset function.

@RobertIndie RobertIndie removed this from the v0.13.0 milestone Jul 15, 2024
@RobertIndie RobertIndie added this to the v0.14.0 milestone Jul 15, 2024
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please help resolve the conflicts

pulsar/backoff/backoff.go Outdated Show resolved Hide resolved
pulsar/backoff/backoff.go Outdated Show resolved Hide resolved
@RobertIndie RobertIndie merged commit 630d5f8 into apache:master Sep 23, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Reconnection logic and Backoff policy doesn't work correctly
4 participants