From f35a4130ef35b534d93e2dfa9f88f22cf8cd1d27 Mon Sep 17 00:00:00 2001 From: ddebowczyk92 Date: Wed, 27 Sep 2023 17:14:59 +0200 Subject: [PATCH] Make SerializableConfiguration cacheable (#28590) --- .../io/hadoop/SerializableConfiguration.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java index bb59b07cc280c..40099e6346786 100644 --- a/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java +++ b/sdks/java/io/hadoop-common/src/main/java/org/apache/beam/sdk/io/hadoop/SerializableConfiguration.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.hadoop; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -39,6 +41,8 @@ public class SerializableConfiguration implements Externalizable { private transient Configuration conf; + private transient byte[] serializationCache; + public SerializableConfiguration() {} public SerializableConfiguration(Configuration conf) { @@ -49,17 +53,30 @@ public SerializableConfiguration(Configuration conf) { } public Configuration get() { + if (serializationCache != null) { + serializationCache = null; + } return conf; } @Override public void writeExternal(ObjectOutput out) throws IOException { + if (serializationCache == null) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + try (DataOutputStream dos = new DataOutputStream(baos)) { + conf.write(dos); + serializationCache = baos.toByteArray(); + } + } out.writeUTF(conf.getClass().getCanonicalName()); - conf.write(out); + out.write(serializationCache); } @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + if (serializationCache != null) { + serializationCache = null; + } String className = in.readUTF(); try { conf =