Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AWS_MSK_IAM authentication #2255

Open
wants to merge 22 commits into
base: master
Choose a base branch
from

Conversation

mattoberle
Copy link

@mattoberle mattoberle commented Aug 18, 2021

This pull request addresses issue #2232 by adding an AWS_MSK_IAM authentication mechanism.
A detailed description of the authentication scheme is available here:

I understand that kafka-python may not be the appropriate place to put a vendor-specific authentication mechanism.
If that's the case maybe it's better suited as a plug-in?
The library doesn't support auth extensions at the moment but it doesn't look like a huge lift to get there.

To use the mechanism pass the following keyword arguments when
initializing a class:

security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',
bootstrap_servers=[
    'b-1.cluster.x.y.kafka.region.amazonaws.com:9098',
    ...
],

The credentials and region will be pulled using botocore.session.Session.
Using the mechanism requires the botocore library which can be
installed with:

pip install botocore

TODO:

  • Documentation
  • Test authentication payload generation
  • Test config verification / auth method
  • Refresh mechanism for temporary credentials?

This change is Reviewable

Adds an AWS_MSK_IAM authentication mechanism which is described here:
* https://github.com/aws/aws-msk-iam-auth#uriencode

To use the mechanism pass the following keyword arguments when
initializing a class:

```
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',
bootstrap_servers=[
    'b-1.cluster.x.y.kafka.region.amazonaws.com:9088',
    ...
],
```

The credentials and region will be pulled using `botocore.session.Session`.
Using the mechanism requires the `botocore` library which can be
installed with:

```sh
pip install botocore
```

**TODO:**

- [ ] Documentation
- [ ] Tests
- [ ] Refresh mechanism for temporary credentials?
@mattoberle mattoberle force-pushed the feature/2232-AWS_MSK_IAM branch from 056b1f2 to cb18e67 Compare August 18, 2021 20:22
The two tests in `test/test_msk.py` should ensure that the changes to
`kafka/msk.py` do not break the authentication payload.

The authentication payload was validated using a real AWS Kafka cluster
before adding tests with the hard-coded signatures.
@mattoberle mattoberle marked this pull request as ready for review August 18, 2021 22:41
@mattoberle
Copy link
Author

It looks like temporary credentials will be refreshed through existing error handling.
I'm not sure if that's the ideal case, but it appears to provide an uncomplicated initial solution.
The following output was clipped from a test producer running the following:

producer = KafkaProducer(**CONFIG)

i = 0
while True:
    i += 1
    ts = datetime.datetime.utcnow().isoformat()
    msg = f'[{i}] {ts} Test Message'
    print(msg)
    future = producer.send(TOPIC, msg.encode('utf-8'))
    meta = future.get(timeout=10)
    print(meta.topic)
    print(meta.partition)
    print(meta.offset)
    time.sleep(10)
[358] 2021-08-19T16:35:59.526059 Test Message
kafka-python-test
0
740

[359] 2021-08-19T16:36:09.569503 Test Message
kafka-python-test
0
741

ERROR:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connected> [IPv4 ('10.10.10.10', 9098)]>: socket disconnected
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connected> [IPv4 ('10.10.10.10', 9098)]>: Closing connection. KafkaConnectionError: socket disconnected
WARNING:kafka.client:Node 3 connection failed -- refreshing metadata
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <connecting> [IPv4 ('10.10.10.10', 9098)]>: connecting to b-3.cluster.x.y.kafka.region.amazonaws.com:9098 [('10.10.10.10', 9098) IPv4]
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:kafka.conn:<BrokerConnection node_id=3 host=connecting to b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <authenticating> [IPv4 ('10.10.10.10, 9098)]>: Authenticated via AWS_MSK_IAM {"version":"2020_10_22","request-id":"f8e468a0-3d8c-4999-bd52-6252855c5387"}
INFO:kafka.conn:<BrokerConnection node_id=3 host=b-3.cluster.x.y.kafka.region.amazonaws.com:9098 <authenticating> [IPv4 ('10.10.10.10', 9098)]>: Connection complete.

[360] 2021-08-19T16:36:19.608427 Test Message
kafka-python-test
0
742

@dnks23
Copy link

dnks23 commented Oct 5, 2021

Hi!
I am really interested in having an aws-msk iam authentication. Could you maybe elaborate on what's the current state here (also related to #2256)? Is the shown error kind of crucial or what is actually meant with "It looks like temporary credentials will be refreshed through existing error handling."?

Thanks in advance!

@mattoberle
Copy link
Author

@dnks23 This PR provides a working implementation of AWS MSK authentication.
If using role-based authentication credential refresh works on a try/except basis (there is no preemptive credential refresh).
In other words, if the credentials expire a producer/consumer will:

  1. Refresh the credentials.
  2. Re-authenticate.
  3. Retry the action.

#2256 is something I put together as an alternative to this PR.
It doesn't provide the MSK auth but exposes a mechanism for plugging in custom auth.

@dnks23
Copy link

dnks23 commented Oct 6, 2021

Thanks for your reply @mattoberle! that's actually great news... Regarding the preemptive credential refresh:

It might be possible to just use botocore.credentials.RefreshableCredentials which can be used to initialize the BotoSession. Haven't tried that but the preemptive refresh could then be left to boto which might be a good solution here.
What's your opinion on that? Have you even already tried that?

@jplock
Copy link

jplock commented Oct 14, 2021

Hi, I was wondering on the next steps for getting this merged and released?

@phoenixx1
Copy link

@mattoberle
@dnks23
Can you please give a sample code of how to connect MSK using IAM auth.

@mattoberle
Copy link
Author

@phoenixx1 to use MSK auth you need to follow these steps:

  1. Provide AWS credentials via a supported boto discovery mechanism.1
  2. Initialize your producer or consumer with these keyword arguments:
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',

1 Expose your IAM credentials via environment variable, or set them in ~/.aws/credentials, or have a role assigned to the EC2 instance or container, etc... https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html


@jplock I'm not the best person to answer your question, we'd need to check with @dpkp to gauge interest in integrating this PR or #2256 which allows this to be supported independently.

The other popular Kafka client library for Python is maintained by Confluent and (understandably) is not especially interested in supporting MSK auth. The confluent library is a set of bindings to librdkafka and a little trickier to extend via plugins.

@phoenixx1
Copy link

@mattoberle
image

I'm trying to connect it using a Lambda Function. The role assigned to lambda function already has access to msk.

Getting this error for the sasl_ssl
image

@mattoberle
Copy link
Author

@phoenixx1 your Lambda Function is not running the code from this branch.

That error message is generated here, and displays the list of valid sasl_mechanisms defined here.

You'll notice that AWS_MSK_IAM is missing in your list.
To access the features in this PR you should install kafka-python from the following URL:

https://github.com/mattoberle/kafka-python/archive/7ff323727d99e0c33a68423300e7f88a9cf3f830.tar.gz

@phoenixx1
Copy link

@mattoberle
Tried using the package url provided by you, still facing same issue. (Created new venv to check)
Do you have any sample code where you have used the same, so I can refer if anything is missing from mine.
Will be really helpful thanks.

@lgasperin
Copy link

@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed?

@mattoberle
Copy link
Author

@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed?

I did not account for this case but it seems worth supporting.

@bkatwal
Copy link

bkatwal commented Mar 3, 2022

@mattoberle Apologies for the long comment :)

We are using changes from this PR to run our producer/consumers.

To connect to MSK, we have created an IAM user and set these IAM user credentials in the environment variable: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
This setup works but almost every 12 hours we get the following error in consumer during poll() operation:

File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py", line 657, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/consumer/group.py", line 679, in _poll_once
    self._coordinator.poll()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 270, in poll
    self.ensure_coordinator_ready()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
    raise future.exception  # pylint: disable-msg=raising-bad-type
kafka.errors.GroupAuthorizationFailedError: [Error 30] GroupAuthorizationFailedError: some_consumer

Found below error in kafka-authorizer.log

denied to perform DESCRIBE on resource some_consumer of type GROUP during API call FindCoordinator because CLIENT_NOT_AUTHENTICATED (kafka.authorizer.logger)"

I guess the boot session token lasts for 12 hours then it is refreshed. Could you help here to understand what is happening?

Could you please help here?

@mattoberle
Copy link
Author

@bkatwal I think you are describing what I attempted to explain here:
#2255 (comment)

The code in this PR doesn't include anything to actively refresh expiring credentials (although I'd expect that the credentials get refreshed during the exception handling).

Full transparency, I'm no longer using kafka-pythonor MSK auth.
There is (understandably) a lack of appetite to incorporate a proprietary authentication scheme into the more active libraries backed by librdkafka.

@Gatsby-Lee
Copy link

@mattoberle
can you share what you use as an alternative of kafka-python?

@darthale
Copy link

@bkatwal and @mattoberle I am a bit late on the topic, but I did some tests using a role that my user needs to assume in order to interact with the MSK cluster. The assumption of the role as you know is only temporary and the session token needs to be refreshed every N seconds.

If you look carefully in the boto3 documentation, you'll see this. This means that if you configure your ~/.aws/config as:

[default]
region = eu-central-1
source_profile = msk-programmatic-user
role_arn = arn:aws:iam::11111111111:role/msk-client
duration_seconds = 900

with the ~/.aws/credentials as:

[msk-programmatic-user]
aws_access_key_id = AKIA...
aws_secret_access_key = E532..

boto3 will know that you want to use the msk-client role and it will take care of refreshing credentials under the hood for you. Without you having to handle any try..except and refresh the token via a i.e sts.assume_role() logic.

Hope this will help!

@luis-nook
Copy link

Hello, this looks pretty solid, however it's been almost a year since it first came about. Is there a roadmap regarding this being merged in and released?

@joshuachong
Copy link

Hi @mattoberle , did you get any interest from @dpkp or the maintainers?
I am interested to pick this up to add in IAM role and creds refresh but would like to check if the maintainers are interested to get this feature merged. I do have use cases for MSK IAM role authentication.

@vrioux
Copy link

vrioux commented Jul 19, 2022

+1 to get this merged

@azizulwahid
Copy link

kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Has anyone encountered this error? I have passed the credentials in environmental variables.

@bkatwal
Copy link

bkatwal commented Jul 26, 2022

@azizulwahid
With credentials, you need to set AWS_DEFAULT_REGION.
Try adding AWS_DEFAULT_REGION environment variable or setting it in your compute/container :)
Set the region where your MSK cluster sits.

@azizulwahid
Copy link

@azizulwahid
Try setting AWS_DEFAULT_REGION environment variable :)
Set the region where your MSK cluster sits.

@bkatwal let me try that real quick

@azizulwahid
Copy link

@bkatwal I still got the same error...

@bkatwal
Copy link

bkatwal commented Jul 26, 2022

@bkatwal I still got the same error...

Could you send the full stacktrace?

@gmuslia
Copy link

gmuslia commented Jun 28, 2023

Any news on the merge timeline?

@wbarnha
Copy link
Collaborator

wbarnha commented Aug 9, 2023

I'm willing to merge but I'll need to re-test this.

@vl-kp
Copy link

vl-kp commented Sep 25, 2023

any update?

@wbarnha
Copy link
Collaborator

wbarnha commented Sep 26, 2023

I'm keeping my eyes open on it. I need to get v2.0.4 released before this is merged. 🥲

@danielpops
Copy link

wbarnha and others added 14 commits March 8, 2024 14:02
…terations for Kafka 0.8.2 and Python 3.12 (dpkp#159)

* skip failing tests for PyPy since they work locally

* Reconfigure tests for PyPy and 3.12

* Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2

* Update test_partitioner.py

* Update test_producer.py

* Timeout tests after ten minutes

* Set 0.8.2.2 to be experimental from hereon

* Formally support PyPy 3.9
* Test Kafka 0.8.2.2 using Python 3.11 in the meantime

* Override PYTHON_LATEST conditionally in python-package.yml

* Update python-package.yml

* add python annotation to kafka version test matrix

* Update python-package.yml

* try python 3.10
* Remove support for EOL'ed versions of Python

* Update setup.py
Too many MRs to review... so little time.
After stop/start kafka service, kafka-python may use 100% CPU caused by
busy-retry while the socket was closed. This fix the issue by unregister
the socket if the fd is negative.

Co-authored-by: Orange Kao <[email protected]>
The former has been deprecated since setuptools 56

Co-authored-by: micwoj92 <[email protected]>
* docs: Update syntax in README.rst

* docs: Update code block syntax in docs/index.rst

---------

Co-authored-by: HalfSweet <[email protected]>
* Fix crc32c's __main__ for Python 3

* Remove TODO from _crc32c.py

---------

Co-authored-by: Yonatan Goldschmidt <[email protected]>
…pkp#155)

* handling OSError

* better error output

* removed traceback logging

---------

Co-authored-by: Alexander Sibiryakov <[email protected]>
@vl-kp
Copy link

vl-kp commented Mar 10, 2024

hope this can be merged soon

…pkp#134)

wakeup

When wakeup() is called, we sometime notice that we get
an endless prints:
"Unable to send to wakeup socket!".

Those prints are spamming the logs.
This commit aims to address it by allowing restating the
application via an intentional exception raise.
This behavior is configurable and its default is backward compatible.

Signed-off-by: shimon-armis <[email protected]>
Co-authored-by: shimon-armis <[email protected]>
@wbarnha
Copy link
Collaborator

wbarnha commented Mar 10, 2024

hope this can be merged soon

It will be in wbarnha#146!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.