From 89d7f909dd50458e72ef124612bd50e76d91f356 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Mon, 16 Oct 2023 16:18:36 +0200 Subject: [PATCH] [proxima-beam-tools] explicitly add coder for joins --- .../main/java/cz/o2/proxima/beam/core/io/PairCoder.java | 1 - .../o2/proxima/beam/tools/groovy/BeamWindowedStream.java | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/core/io/PairCoder.java b/beam/core/src/main/java/cz/o2/proxima/beam/core/io/PairCoder.java index 5df78286c..212b54930 100644 --- a/beam/core/src/main/java/cz/o2/proxima/beam/core/io/PairCoder.java +++ b/beam/core/src/main/java/cz/o2/proxima/beam/core/io/PairCoder.java @@ -54,7 +54,6 @@ private PairCoder(Coder keyCoder, Coder valueCoder) { @Override public void encode(Pair value, OutputStream outStream) throws IOException { - keyCoder.encode(value.getFirst(), outStream); valueCoder.encode(value.getSecond(), outStream); } diff --git a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java index dfeb11319..3df87ebdd 100644 --- a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java +++ b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java @@ -393,9 +393,11 @@ public WindowedStream> join( name == null ? Join.innerJoin(leftKv, rightKv) : Join.innerJoin(name + ".join", leftKv, rightKv); - return joined.apply( - MapElements.into(resultCoder.getEncodedTypeDescriptor()) - .via(kv -> Pair.of(kv.getValue().getKey(), kv.getValue().getValue()))); + return joined + .apply( + MapElements.into(resultCoder.getEncodedTypeDescriptor()) + .via(kv -> Pair.of(kv.getValue().getKey(), kv.getValue().getValue()))) + .setCoder(resultCoder); }); }