-
Notifications
You must be signed in to change notification settings - Fork 136
[improve] Pass group ID to authorizer when using OAuth #1926
[improve] Pass group ID to authorizer when using OAuth #1926
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1926 +/- ##
============================================
- Coverage 17.94% 17.85% -0.10%
- Complexity 745 752 +7
============================================
Files 194 195 +1
Lines 13957 14061 +104
Branches 1291 1308 +17
============================================
+ Hits 2505 2511 +6
- Misses 11271 11368 +97
- Partials 181 182 +1
|
kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaServiceConfiguration.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to prevent consumers from subscribing group-B if they have configured the permission for group-A.
3aa978d
to
591c45b
Compare
/** | ||
* {@code SaslServer} implementation for SASL/OAUTHBEARER in Kafka. An instance | ||
* of {@link OAuthBearerToken} is available upon successful authentication via | ||
* the negotiated property "{@code OAUTHBEARER.token}"; the token could be used | ||
* in a custom authorizer (to authorize based on JWT claims rather than ACLs, | ||
* for example). | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you just copy the Java docs from Kafka? It might be misleading here. It's better to remove them. For example, {@code SaslServer}
and {@code OAUTHBEARER.token}
never link correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
tests/src/test/java/io/streamnative/pulsar/handlers/kop/SaslOAuthKopHandlersWithGroupId.java
Outdated
Show resolved
Hide resolved
oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java
Outdated
Show resolved
Hide resolved
oauth-client/src/main/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientConfig.java
Outdated
Show resolved
Hide resolved
.../main/java/io/streamnative/pulsar/handlers/kop/security/oauth/OauthLoginCallbackHandler.java
Show resolved
Hide resolved
@Demogorgon314 Could you resolve the conflicts? |
e159653
to
dc9f1d7
Compare
@BewareMyPower Resolved. |
.../test/java/io/streamnative/pulsar/handlers/kop/security/oauth/ClientCredentialsFlowTest.java
Show resolved
Hide resolved
...mpl/src/main/java/io/streamnative/pulsar/handlers/kop/security/auth/SimpleAclAuthorizer.java
Outdated
Show resolved
Hide resolved
…1926) ### Motivation Currently, the KoP only supports checking the topic permission when doing the consume authorization, but Kafka support checking the topic and group ID permission. Before introducing this change, let's understand why KoP can't check group ID permission. When Kafka does the consume authorization check, it will first check the group ID permission in `handleJoinGroupRequest` method, then it will check the topic permission in the `handleFetchRequest` method. However, Pulsar is using another way to check consume permission. See the `org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync` method, it requires passing the topic name and subscription to check both permissions in the same place, and the topic name is required param. ``` public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData,String subscription) ``` If we follow the Kafka way to check the consumer permission, we can't get the topic name when joining a group since the join group request does not contain the topic name. So we have to authorize permission in the fetch request. However, to authorization consume permission in the fetch request, we can only get the topic name in this request. In this case, we can't authorize the group ID. ### Modifications Since we can't get group ID when handling fetch requests, we need to find a way to pass through group ID when doing the authentication. In OAuth, we have a `credentials_file.json` file that needs to be config, for example: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant" } ``` Here we can add a new parameter into the config: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant", "group_id": "my-group-id" } ``` Then we can add these parameters to `SaslExtensions`, and send it to the broker. (cherry picked from commit e108e44)
…1926) ### Motivation Currently, the KoP only supports checking the topic permission when doing the consume authorization, but Kafka support checking the topic and group ID permission. Before introducing this change, let's understand why KoP can't check group ID permission. When Kafka does the consume authorization check, it will first check the group ID permission in `handleJoinGroupRequest` method, then it will check the topic permission in the `handleFetchRequest` method. However, Pulsar is using another way to check consume permission. See the `org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync` method, it requires passing the topic name and subscription to check both permissions in the same place, and the topic name is required param. ``` public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData,String subscription) ``` If we follow the Kafka way to check the consumer permission, we can't get the topic name when joining a group since the join group request does not contain the topic name. So we have to authorize permission in the fetch request. However, to authorization consume permission in the fetch request, we can only get the topic name in this request. In this case, we can't authorize the group ID. ### Modifications Since we can't get group ID when handling fetch requests, we need to find a way to pass through group ID when doing the authentication. In OAuth, we have a `credentials_file.json` file that needs to be config, for example: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant" } ``` Here we can add a new parameter into the config: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant", "group_id": "my-group-id" } ``` Then we can add these parameters to `SaslExtensions`, and send it to the broker. (cherry picked from commit e108e44)
…#1926) (#1945) ### Motivation Currently, the KoP only supports checking the topic permission when doing the consume authorization, but Kafka support checking the topic and group ID permission. Before introducing this change, let's understand why KoP can't check group ID permission. When Kafka does the consume authorization check, it will first check the group ID permission in `handleJoinGroupRequest` method, then it will check the topic permission in the `handleFetchRequest` method. However, Pulsar is using another way to check consume permission. See the `org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync` method, it requires passing the topic name and subscription to check both permissions in the same place, and the topic name is required param. ``` public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData,String subscription) ``` If we follow the Kafka way to check the consumer permission, we can't get the topic name when joining a group since the join group request does not contain the topic name. So we have to authorize permission in the fetch request. However, to authorization consume permission in the fetch request, we can only get the topic name in this request. In this case, we can't authorize the group ID. ### Modifications Since we can't get group ID when handling fetch requests, we need to find a way to pass through group ID when doing the authentication. In OAuth, we have a `credentials_file.json` file that needs to be config, for example: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant" } ``` Here we can add a new parameter into the config: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant", "group_id": "my-group-id" } ``` Then we can add these parameters to `SaslExtensions`, and send it to the broker. (cherry picked from commit e108e44)
…1926) ### Motivation Currently, the KoP only supports checking the topic permission when doing the consume authorization, but Kafka support checking the topic and group ID permission. Before introducing this change, let's understand why KoP can't check group ID permission. When Kafka does the consume authorization check, it will first check the group ID permission in `handleJoinGroupRequest` method, then it will check the topic permission in the `handleFetchRequest` method. However, Pulsar is using another way to check consume permission. See the `org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync` method, it requires passing the topic name and subscription to check both permissions in the same place, and the topic name is required param. ``` public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData,String subscription) ``` If we follow the Kafka way to check the consumer permission, we can't get the topic name when joining a group since the join group request does not contain the topic name. So we have to authorize permission in the fetch request. However, to authorization consume permission in the fetch request, we can only get the topic name in this request. In this case, we can't authorize the group ID. ### Modifications Since we can't get group ID when handling fetch requests, we need to find a way to pass through group ID when doing the authentication. In OAuth, we have a `credentials_file.json` file that needs to be config, for example: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant" } ``` Here we can add a new parameter into the config: ``` { "client_id": "my-id", "client_secret": "my-secret", "tenant": "my-tenant", "group_id": "my-group-id" } ``` Then we can add these parameters to `SaslExtensions`, and send it to the broker. (cherry picked from commit e108e44)
Motivation
Currently, the KoP only supports checking the topic permission when doing the consume authorization, but Kafka support checking the topic and group ID permission.
Before introducing this change, let's understand why KoP can't check group ID permission.
When Kafka does the consume authorization check, it will first check the group ID permission in
handleJoinGroupRequest
method, then it will check the topic permission in thehandleFetchRequest
method.However, Pulsar is using another way to check consume permission. See the
org.apache.pulsar.broker.authorization.AuthorizationService#canConsumeAsync
method, it requires passing the topic name and subscription to check both permissions in the same place, and the topic name is required param.If we follow the Kafka way to check the consumer permission, we can't get the topic name when joining a group since the join group request does not contain the topic name. So we have to authorize permission in the fetch request.
However, to authorization consume permission in the fetch request, we can only get the topic name in this request. In this case, we can't authorize the group ID.
Modifications
Since we can't get group ID when handling fetch requests, we need to find a way to pass through group ID when doing the authentication.
In OAuth, we have a
credentials_file.json
file that needs to be config, for example:Here we can add a new parameter into the config:
Then we can add these parameters to
SaslExtensions
, and send it to the broker.Documentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)