Skip to content

Commit

Permalink
review cr: castTypes util
Browse files Browse the repository at this point in the history
- s/castHDate/maybeCastHDate/ to be more concise
- move values manipulation to a separate util (hopefully, I understood
  the cr in the right way)
  • Loading branch information
deadb0d4 committed Oct 9, 2024
1 parent 7c09ed8 commit e811481
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,26 @@ public PCollection<Row> expand(PBegin input) {
private static class HCatToRowFn extends DoFn<HCatRecord, Row> {
private final Schema schema;

private Object castHDate(Object obj) {
private Object maybeCastHDate(Object obj) {
if (obj instanceof org.apache.hadoop.hive.common.type.Date) {
return new Instant(((org.apache.hadoop.hive.common.type.Date) obj).toEpochMilli());
}
return obj;
}

/** Cast objects of the types that aren't supported by {@link Row}. */
private List<Object> castTypes(List<Object> values) {
return values.stream().map(this::maybeCastHDate).collect(Collectors.toList());
}

HCatToRowFn(Schema schema) {
this.schema = schema;
}

@ProcessElement
public void processElement(ProcessContext c) {
HCatRecord hCatRecord = c.element();
List<Object> recordValues =
hCatRecord.getAll().stream().map(this::castHDate).collect(Collectors.toList());
c.output(Row.withSchema(schema).addValues(recordValues).build());
c.output(Row.withSchema(schema).addValues(castTypes(hCatRecord.getAll())).build());
}
}
}

0 comments on commit e811481

Please sign in to comment.