From e811481305452aac94c5b9d5ece50067ecd52ffb Mon Sep 17 00:00:00 2001 From: Dmitry Ulyumdzhiev Date: Wed, 9 Oct 2024 18:23:09 +0100 Subject: [PATCH] review cr: castTypes util - s/castHDate/maybeCastHDate/ to be more concise - move values manipulation to a separate util (hopefully, I understood the cr in the right way) --- .../org/apache/beam/sdk/io/hcatalog/HCatToRow.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java index dedbfd77cf788..e5bdf18ecbcf4 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatToRow.java @@ -77,13 +77,18 @@ public PCollection expand(PBegin input) { private static class HCatToRowFn extends DoFn { private final Schema schema; - private Object castHDate(Object obj) { + private Object maybeCastHDate(Object obj) { if (obj instanceof org.apache.hadoop.hive.common.type.Date) { return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli()); } return obj; } + /** Cast objects of the types that aren't supported by {@link Row}. */ + private List castTypes(List values) { + return values.stream().map(this::maybeCastHDate).collect(Collectors.toList()); + } + HCatToRowFn(Schema schema) { this.schema = schema; } @@ -91,9 +96,7 @@ private Object castHDate(Object obj) { @ProcessElement public void processElement(ProcessContext c) { HCatRecord hCatRecord = c.element(); - List recordValues = - hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList()); - c.output(Row.withSchema(schema).addValues(recordValues).build()); + c.output(Row.withSchema(schema).addValues(castTypes(hCatRecord.getAll())).build()); } } }