diff --git a/java/source/flight.rst b/java/source/flight.rst index 5e181677..3868c44e 100644 --- a/java/source/flight.rst +++ b/java/source/flight.rst @@ -33,7 +33,7 @@ and data in memory to store the actual data. Flight Client and Server ************************ -.. testcode:: +.. code-block:: java import org.apache.arrow.flight.Action; import org.apache.arrow.flight.AsyncPutListener; @@ -95,6 +95,7 @@ Flight Client and Server AutoCloseables.close(batches); } } + class CookbookProducer extends NoOpFlightProducer implements AutoCloseable { private final BufferAllocator allocator; private final Location location; @@ -194,95 +195,100 @@ Flight Client and Server AutoCloseables.close(datasets.values()); } } - Location location = Location.forGrpcInsecure("0.0.0.0", 33333); - try (BufferAllocator allocator = new RootAllocator()){ - // Server - try(final CookbookProducer producer = new CookbookProducer(allocator, location); - final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) { - try { - flightServer.start(); - System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); - } catch (IOException e) { - throw new RuntimeException(e); - } - - // Client - try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { - System.out.println("C1: Client (Location): Connected to " + location.getUri()); - - // Populate data - Schema schema = new Schema(Arrays.asList( - new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); - try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); - VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) { - varCharVector.allocateNew(3); - varCharVector.set(0, "Ronald".getBytes()); - varCharVector.set(1, "David".getBytes()); - varCharVector.set(2, "Francisco".getBytes()); - vectorSchemaRoot.setRowCount(3); - FlightClient.ClientStreamListener listener = flightClient.startPut( - FlightDescriptor.path("profiles"), - vectorSchemaRoot, new AsyncPutListener()); - listener.putNext(); - varCharVector.set(0, "Manuel".getBytes()); - varCharVector.set(1, "Felipe".getBytes()); - varCharVector.set(2, "JJ".getBytes()); - vectorSchemaRoot.setRowCount(3); - listener.putNext(); - listener.completed(); - listener.getResult(); - System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); - } - - // Get metadata information - FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); - System.out.println("C3: Client (Get Metadata): " + flightInfo); - - // Get data information - try(FlightStream flightStream = flightClient.getStream(new Ticket( - FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) { - int batch = 0; - try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { - System.out.println("C4: Client (Get Stream):"); - while (flightStream.next()) { - batch++; - System.out.println("Client Received batch #" + batch + ", Data:"); - System.out.print(vectorSchemaRootReceived.contentToTSVString()); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - - // Get all metadata information - Iterable flightInfosBefore = flightClient.listFlights(Criteria.ALL); - System.out.print("C5: Client (List Flights Info): "); - flightInfosBefore.forEach(t -> System.out.println(t)); - - // Do delete action - Iterator deleteActionResult = flightClient.doAction(new Action("DELETE", - FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); - while (deleteActionResult.hasNext()) { - Result result = deleteActionResult.next(); - System.out.println("C6: Client (Do Delete Action): " + - new String(result.getBody(), StandardCharsets.UTF_8)); - } - - // Get all metadata information (to validate detele action) - Iterable flightInfos = flightClient.listFlights(Criteria.ALL); - flightInfos.forEach(t -> System.out.println(t)); - System.out.println("C7: Client (List Flights Info): After delete - No records"); - - // Server shut down - flightServer.shutdown(); - System.out.println("C8: Server shut down successfully"); - } - } catch (Exception e) { - e.printStackTrace(); + + public class FlightCookbook { + public static void main(String[] args) { + Location location = Location.forGrpcInsecure("0.0.0.0", 33333); + try (BufferAllocator allocator = new RootAllocator()){ + // Server + try(final CookbookProducer producer = new CookbookProducer(allocator, location); + final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) { + try { + flightServer.start(); + System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Client + try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { + System.out.println("C1: Client (Location): Connected to " + location.getUri()); + + // Populate data + Schema schema = new Schema(Arrays.asList( + new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); + try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); + VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) { + varCharVector.allocateNew(3); + varCharVector.set(0, "Ronald".getBytes()); + varCharVector.set(1, "David".getBytes()); + varCharVector.set(2, "Francisco".getBytes()); + vectorSchemaRoot.setRowCount(3); + FlightClient.ClientStreamListener listener = flightClient.startPut( + FlightDescriptor.path("profiles"), + vectorSchemaRoot, new AsyncPutListener()); + listener.putNext(); + varCharVector.set(0, "Manuel".getBytes()); + varCharVector.set(1, "Felipe".getBytes()); + varCharVector.set(2, "JJ".getBytes()); + vectorSchemaRoot.setRowCount(3); + listener.putNext(); + listener.completed(); + listener.getResult(); + System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); + } + + // Get metadata information + FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); + System.out.println("C3: Client (Get Metadata): " + flightInfo); + + // Get data information + try(FlightStream flightStream = flightClient.getStream(new Ticket( + FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)))) { + int batch = 0; + try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { + System.out.println("C4: Client (Get Stream):"); + while (flightStream.next()) { + batch++; + System.out.println("Client Received batch #" + batch + ", Data:"); + System.out.print(vectorSchemaRootReceived.contentToTSVString()); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + + // Get all metadata information + Iterable flightInfosBefore = flightClient.listFlights(Criteria.ALL); + System.out.print("C5: Client (List Flights Info): "); + flightInfosBefore.forEach(t -> System.out.println(t)); + + // Do delete action + Iterator deleteActionResult = flightClient.doAction(new Action("DELETE", + FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); + while (deleteActionResult.hasNext()) { + Result result = deleteActionResult.next(); + System.out.println("C6: Client (Do Delete Action): " + + new String(result.getBody(), StandardCharsets.UTF_8)); + } + + // Get all metadata information (to validate detele action) + Iterable flightInfos = flightClient.listFlights(Criteria.ALL); + flightInfos.forEach(t -> System.out.println(t)); + System.out.println("C7: Client (List Flights Info): After delete - No records"); + + // Server shut down + flightServer.shutdown(); + System.out.println("S2: Server shut down successfully"); + } + } catch (Exception e) { + e.printStackTrace(); + } + } } } -.. testoutput:: +.. code-block:: shell S1: Server (Location): Listening on port 33333 C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 @@ -302,7 +308,7 @@ Flight Client and Server C5: Client (List Flights Info): FlightInfo{schema=Schema, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6} C6: Client (Do Delete Action): Delete completed C7: Client (List Flights Info): After delete - No records - C8: Server shut down successfully + S2: Server shut down successfully Let explain our code in more detail. @@ -547,10 +553,10 @@ Stop Flight Server // Server flightServer.shutdown(); - System.out.println("C8: Server shut down successfully"); + System.out.println("S2: Server shut down successfully"); .. code-block:: shell - C8: Server shut down successfully + S2: Server shut down successfully _`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html