-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support AWS_MSK_IAM authentication #2255
base: master
Are you sure you want to change the base?
Conversation
Adds an AWS_MSK_IAM authentication mechanism which is described here: * https://github.com/aws/aws-msk-iam-auth#uriencode To use the mechanism pass the following keyword arguments when initializing a class: ``` security_protocol='SASL_SSL', sasl_mechanism='AWS_MSK_IAM', bootstrap_servers=[ 'b-1.cluster.x.y.kafka.region.amazonaws.com:9088', ... ], ``` The credentials and region will be pulled using `botocore.session.Session`. Using the mechanism requires the `botocore` library which can be installed with: ```sh pip install botocore ``` **TODO:** - [ ] Documentation - [ ] Tests - [ ] Refresh mechanism for temporary credentials?
056b1f2
to
cb18e67
Compare
The two tests in `test/test_msk.py` should ensure that the changes to `kafka/msk.py` do not break the authentication payload. The authentication payload was validated using a real AWS Kafka cluster before adding tests with the hard-coded signatures.
It looks like temporary credentials will be refreshed through existing error handling. producer = KafkaProducer(**CONFIG)
i = 0
while True:
i += 1
ts = datetime.datetime.utcnow().isoformat()
msg = f'[{i}] {ts} Test Message'
print(msg)
future = producer.send(TOPIC, msg.encode('utf-8'))
meta = future.get(timeout=10)
print(meta.topic)
print(meta.partition)
print(meta.offset)
time.sleep(10)
|
Hi! Thanks in advance! |
@dnks23 This PR provides a working implementation of AWS MSK authentication.
#2256 is something I put together as an alternative to this PR. |
Thanks for your reply @mattoberle! that's actually great news... Regarding the preemptive credential refresh: It might be possible to just use |
Hi, I was wondering on the next steps for getting this merged and released? |
@mattoberle |
@phoenixx1 to use MSK auth you need to follow these steps:
@jplock I'm not the best person to answer your question, we'd need to check with @dpkp to gauge interest in integrating this PR or #2256 which allows this to be supported independently. The other popular Kafka client library for Python is maintained by Confluent and (understandably) is not especially interested in supporting MSK auth. The confluent library is a set of bindings to |
I'm trying to connect it using a Lambda Function. The role assigned to lambda function already has access to msk. |
@phoenixx1 your Lambda Function is not running the code from this branch. That error message is generated here, and displays the list of valid You'll notice that
|
@mattoberle |
@mattoberle The issue I have is that my boto session needs to assume a role since I use a cross-account AWS MSK. This is not supported, since the basic boto session is created inside of the library. It can be forked and changed, so that session is now passed as an argument, but was wondering if there is any other way to do it, that I missed? |
I did not account for this case but it seems worth supporting. |
@mattoberle Apologies for the long comment :) We are using changes from this PR to run our producer/consumers. To connect to MSK, we have created an IAM user and set these IAM user credentials in the environment variable: AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
Found below error in kafka-authorizer.log
I guess the boot session token lasts for 12 hours then it is refreshed. Could you help here to understand what is happening? Could you please help here? |
@bkatwal I think you are describing what I attempted to explain here: The code in this PR doesn't include anything to actively refresh expiring credentials (although I'd expect that the credentials get refreshed during the exception handling). Full transparency, I'm no longer using |
@mattoberle |
@bkatwal and @mattoberle I am a bit late on the topic, but I did some tests using a role that my user needs to assume in order to interact with the MSK cluster. The assumption of the role as you know is only temporary and the session token needs to be refreshed every N seconds. If you look carefully in the boto3 documentation, you'll see this. This means that if you configure your
with the ~/.aws/credentials as:
boto3 will know that you want to use the Hope this will help! |
Hello, this looks pretty solid, however it's been almost a year since it first came about. Is there a roadmap regarding this being merged in and released? |
Hi @mattoberle , did you get any interest from @dpkp or the maintainers? |
+1 to get this merged |
Has anyone encountered this error? I have passed the credentials in environmental variables. |
@azizulwahid |
@bkatwal let me try that real quick |
@bkatwal I still got the same error... |
Could you send the full stacktrace? |
Any news on the merge timeline? |
I'm willing to merge but I'll need to re-test this. |
any update? |
I'm keeping my eyes open on it. I need to get v2.0.4 released before this is merged. 🥲 |
…terations for Kafka 0.8.2 and Python 3.12 (dpkp#159) * skip failing tests for PyPy since they work locally * Reconfigure tests for PyPy and 3.12 * Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2 * Update test_partitioner.py * Update test_producer.py * Timeout tests after ten minutes * Set 0.8.2.2 to be experimental from hereon * Formally support PyPy 3.9
* Test Kafka 0.8.2.2 using Python 3.11 in the meantime * Override PYTHON_LATEST conditionally in python-package.yml * Update python-package.yml * add python annotation to kafka version test matrix * Update python-package.yml * try python 3.10
* Remove support for EOL'ed versions of Python * Update setup.py
Too many MRs to review... so little time.
After stop/start kafka service, kafka-python may use 100% CPU caused by busy-retry while the socket was closed. This fix the issue by unregister the socket if the fd is negative. Co-authored-by: Orange Kao <[email protected]>
Co-authored-by: Ryar Nyah <[email protected]>
Co-authored-by: Denis Otkidach <[email protected]>
The former has been deprecated since setuptools 56 Co-authored-by: micwoj92 <[email protected]>
* docs: Update syntax in README.rst * docs: Update code block syntax in docs/index.rst --------- Co-authored-by: HalfSweet <[email protected]>
* Fix crc32c's __main__ for Python 3 * Remove TODO from _crc32c.py --------- Co-authored-by: Yonatan Goldschmidt <[email protected]>
Co-authored-by: Dave Voutila <[email protected]>
…pkp#155) * handling OSError * better error output * removed traceback logging --------- Co-authored-by: Alexander Sibiryakov <[email protected]>
hope this can be merged soon |
…pkp#134) wakeup When wakeup() is called, we sometime notice that we get an endless prints: "Unable to send to wakeup socket!". Those prints are spamming the logs. This commit aims to address it by allowing restating the application via an intentional exception raise. This behavior is configurable and its default is backward compatible. Signed-off-by: shimon-armis <[email protected]> Co-authored-by: shimon-armis <[email protected]>
It will be in wbarnha#146! |
This pull request addresses issue #2232 by adding an
AWS_MSK_IAM
authentication mechanism.A detailed description of the authentication scheme is available here:
To use the mechanism pass the following keyword arguments when
initializing a class:
The credentials and region will be pulled using
botocore.session.Session
.Using the mechanism requires the
botocore
library which can beinstalled with:
TODO:
This change is