Skip to content

[Java] Make Flight example executable out of the box #291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 96 additions & 90 deletions java/source/flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ and data in memory to store the actual data.
Flight Client and Server
************************

.. testcode::
.. code-block:: java

import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.AsyncPutListener;
Expand Down Expand Up @@ -95,6 +95,7 @@ Flight Client and Server
AutoCloseables.close(batches);
}
}

class CookbookProducer extends NoOpFlightProducer implements AutoCloseable {
private final BufferAllocator allocator;
private final Location location;
Expand Down Expand Up @@ -194,95 +195,100 @@ Flight Client and Server
AutoCloseables.close(datasets.values());
}
}
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()){
// Server
try(final CookbookProducer producer = new CookbookProducer(allocator, location);
final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
try {
flightServer.start();
System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
} catch (IOException e) {
throw new RuntimeException(e);
}

// Client
try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
System.out.println("C1: Client (Location): Connected to " + location.getUri());

// Populate data
Schema schema = new Schema(Arrays.asList(
new Field("name", FieldType.nullable(new ArrowType.Utf8()), null)));
try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) {
varCharVector.allocateNew(3);
varCharVector.set(0, "Ronald".getBytes());
varCharVector.set(1, "David".getBytes());
varCharVector.set(2, "Francisco".getBytes());
vectorSchemaRoot.setRowCount(3);
FlightClient.ClientStreamListener listener = flightClient.startPut(
FlightDescriptor.path("profiles"),
vectorSchemaRoot, new AsyncPutListener());
listener.putNext();
varCharVector.set(0, "Manuel".getBytes());
varCharVector.set(1, "Felipe".getBytes());
varCharVector.set(2, "JJ".getBytes());
vectorSchemaRoot.setRowCount(3);
listener.putNext();
listener.completed();
listener.getResult();
System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
}

// Get metadata information
FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
System.out.println("C3: Client (Get Metadata): " + flightInfo);

// Get data information
try(FlightStream flightStream = flightClient.getStream(new Ticket(
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) {
int batch = 0;
try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) {
System.out.println("C4: Client (Get Stream):");
while (flightStream.next()) {
batch++;
System.out.println("Client Received batch #" + batch + ", Data:");
System.out.print(vectorSchemaRootReceived.contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}

// Get all metadata information
Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
System.out.print("C5: Client (List Flights Info): ");
flightInfosBefore.forEach(t -> System.out.println(t));

// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
while (deleteActionResult.hasNext()) {
Result result = deleteActionResult.next();
System.out.println("C6: Client (Do Delete Action): " +
new String(result.getBody(), StandardCharsets.UTF_8));
}

// Get all metadata information (to validate detele action)
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(t -> System.out.println(t));
System.out.println("C7: Client (List Flights Info): After delete - No records");

// Server shut down
flightServer.shutdown();
System.out.println("C8: Server shut down successfully");
}
} catch (Exception e) {
e.printStackTrace();

public class FlightCookbook {
public static void main(String[] args) {
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
try (BufferAllocator allocator = new RootAllocator()){
// Server
try(final CookbookProducer producer = new CookbookProducer(allocator, location);
final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) {
try {
flightServer.start();
System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort());
} catch (IOException e) {
throw new RuntimeException(e);
}

// Client
try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) {
System.out.println("C1: Client (Location): Connected to " + location.getUri());

// Populate data
Schema schema = new Schema(Arrays.asList(
new Field("name", FieldType.nullable(new ArrowType.Utf8()), null)));
try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) {
varCharVector.allocateNew(3);
varCharVector.set(0, "Ronald".getBytes());
varCharVector.set(1, "David".getBytes());
varCharVector.set(2, "Francisco".getBytes());
vectorSchemaRoot.setRowCount(3);
FlightClient.ClientStreamListener listener = flightClient.startPut(
FlightDescriptor.path("profiles"),
vectorSchemaRoot, new AsyncPutListener());
listener.putNext();
varCharVector.set(0, "Manuel".getBytes());
varCharVector.set(1, "Felipe".getBytes());
varCharVector.set(2, "JJ".getBytes());
vectorSchemaRoot.setRowCount(3);
listener.putNext();
listener.completed();
listener.getResult();
System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each");
}

// Get metadata information
FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles"));
System.out.println("C3: Client (Get Metadata): " + flightInfo);

// Get data information
try(FlightStream flightStream = flightClient.getStream(new Ticket(
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) {
int batch = 0;
try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) {
System.out.println("C4: Client (Get Stream):");
while (flightStream.next()) {
batch++;
System.out.println("Client Received batch #" + batch + ", Data:");
System.out.print(vectorSchemaRootReceived.contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}

// Get all metadata information
Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
System.out.print("C5: Client (List Flights Info): ");
flightInfosBefore.forEach(t -> System.out.println(t));

// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE",
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)));
while (deleteActionResult.hasNext()) {
Result result = deleteActionResult.next();
System.out.println("C6: Client (Do Delete Action): " +
new String(result.getBody(), StandardCharsets.UTF_8));
}

// Get all metadata information (to validate detele action)
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(t -> System.out.println(t));
System.out.println("C7: Client (List Flights Info): After delete - No records");

// Server shut down
flightServer.shutdown();
System.out.println("S2: Server shut down successfully");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

.. testoutput::
.. code-block:: shell

S1: Server (Location): Listening on port 33333
C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
Expand All @@ -302,7 +308,7 @@ Flight Client and Server
C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6}
C6: Client (Do Delete Action): Delete completed
C7: Client (List Flights Info): After delete - No records
C8: Server shut down successfully
S2: Server shut down successfully

Let explain our code in more detail.

Expand Down Expand Up @@ -547,10 +553,10 @@ Stop Flight Server

// Server
flightServer.shutdown();
System.out.println("C8: Server shut down successfully");
System.out.println("S2: Server shut down successfully");

.. code-block:: shell

C8: Server shut down successfully
S2: Server shut down successfully

_`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html