Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add portable Mqtt source and sink transforms #32385

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ahmedabu98
Copy link
Contributor

Fixes #21060

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@ahmedabu98
Copy link
Contributor Author

CC: @chamikaramj

You can test this out locally with the following code. @gdiazdelrio can you check this PR out and give it a shot? Let me know if something doesn't work

import apache_beam as beam
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
from apache_beam.transforms.external import BeamJarExpansionService

provider = ExternalTransformProvider(BeamJarExpansionService("sdks:java:io:expansion-service:shadowJar"))
MqttRead = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_read:v1")
MqttWrite = provider.get_urn("beam:schematransform:org.apache.beam:mqtt_write:v1")

with beam.Pipeline() as p:
  connection_configuration = {
    "server_uri": "tcp://localhost:58494",
    "topic": "WRITE_TOPIC",
    "client_id": "READ_PIPELINE"
  }

  # read
  p | MqttRead(connection_configuration, max_read_time_seconds=10) | beam.Map(print)

  # write
  # p | beam.Create([beam.Row(bytes=bytes([1, 2, 3, 4, 5]))]) | MqttWrite(
  #       connection_configuration=connection_configuration)

@Abacn
Copy link
Contributor

Abacn commented Oct 2, 2024

Hi, what is the status of this PR? @ahmedabu98 @chamikaramj

@ahmedabu98
Copy link
Contributor Author

@twosom perhaps you'd be interested in trying this out? I can adjust the PR to include your recently added feature to read with metadata (#32668)

@twosom
Copy link
Contributor

twosom commented Oct 10, 2024

@twosom perhaps you'd be interested in trying this out? I can adjust the PR to include your recently added feature to read with metadata (#32668)

@ahmedabu98
Thank you for the opportunity. May I give it a try?

I’ve only worked with the Java SDK so far, so this will be my first time dealing with portable development. It might take me some time, but I’ll do my best to work through it.

If I have any questions along the way, would it be alright to leave a comment here on the PR?

@ahmedabu98
Copy link
Contributor Author

Thank you for the opportunity. May I give it a try?

Of course! You'd be doing me a favor :) Let me know how I can help

To start, you can git checkout this PR and first run ./gradlew sdks:java:io:expansion-service:build to build the necessary Java jar, then run the python code snippet I pasted above.

@twosom
Copy link
Contributor

twosom commented Oct 13, 2024

Thank you for the opportunity. May I give it a try?

Of course! You'd be doing me a favor :) Let me know how I can help

To start, you can git checkout this PR and first run ./gradlew sdks:java:io:expansion-service:build to build the necessary Java jar, then run the python code snippet I pasted above.

@ahmedabu98
Thanks!

I'll add feature for

and also read and write

Should I start by creating my own branch and cherry-picking your commit?

twosom pushed a commit to twosom/beam that referenced this pull request Oct 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MQTT IO connector for Python
3 participants