Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully exit python script using Streams #450

Open
arjun180 opened this issue Apr 29, 2022 · 1 comment
Open

Gracefully exit python script using Streams #450

arjun180 opened this issue Apr 29, 2022 · 1 comment

Comments

@arjun180
Copy link

I have a use case associated with pulling data from a Kafka topic. I need the streamz operator exit gracefully and exit the python script once it hits an exception. It looks something like this :

source = Stream.from_kafka_batched(TOPIC, kafka_confs, poll_interval='20s', max_batch_size=10000)

def process_messages():
    try:
         #process_messages
   except Exception as e:
        print(e)
        disconnect_gracefully()

def disconnect_gracefully():
    logging.info("Exit gracefully")
    source.stop()
    source.destory()
    
source.map(process_messages)

While this seems to work for the streamz operator, I feel like it doesn't disconnect from the Kafka broker and I get logs like this

%6|1651194599.149|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: my-kafka-server:9093: Disconnected (after 80522ms in state UP)

So, the script doesn't exit. Any pointers to how this can be done effectively?

@martindurant
Copy link
Member

@chinmaychandak , @jsmaupin , @roveo - anyone still interested in smooth operation of streamz and kafka?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants