Skip to content

Commit

Permalink
[FLINK-24017][kryo] Add scala-free code paths
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 18, 2021
1 parent 5dba785 commit 1bfeaba
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,32 @@ public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> serialize
* Apply a list of {@link KryoRegistration} to a Kryo instance. The list of registrations is
* assumed to already be a final resolution of all possible registration overwrites.
*
* <p>The registrations are applied in the given order and always specify the registration id as
* the next available id in the Kryo instance (providing the id just extra ensures nothing is
* overwritten, and isn't strictly required);
* <p>The registrations are applied in the given order and always specify the registration id,
* using the given {@code firstRegistrationId} and incrementing it for each registration.
*
* @param kryo the Kryo instance to apply the registrations
* @param resolvedRegistrations the registrations, which should already be resolved of all
* possible registration overwrites
* @param firstRegistrationId the first registration id to use
*/
public static void applyRegistrations(
Kryo kryo, Collection<KryoRegistration> resolvedRegistrations) {
Kryo kryo,
Collection<KryoRegistration> resolvedRegistrations,
int firstRegistrationId) {

int currentRegistrationId = firstRegistrationId;
Serializer<?> serializer;
for (KryoRegistration registration : resolvedRegistrations) {
serializer = registration.getSerializer(kryo);

if (serializer != null) {
kryo.register(
registration.getRegisteredClass(),
serializer,
kryo.getNextRegistrationId());
kryo.register(registration.getRegisteredClass(), serializer, currentRegistrationId);
} else {
kryo.register(registration.getRegisteredClass(), kryo.getNextRegistrationId());
kryo.register(registration.getRegisteredClass(), currentRegistrationId);
}
// if Kryo already had a serializer for that type then it ignores the registration
if (kryo.getRegistration(currentRegistrationId) != null) {
currentRegistrationId++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ private void checkKryoInitialized() {

this.kryo.setAsmEnabled(true);

KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
KryoUtils.applyRegistrations(
this.kryo, kryoRegistrations.values(), this.kryo.getNextRegistrationId());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;

/** Interface for flink-core to interact with the FlinkChillPackageRegistrar in flink-java. */
public interface ChillSerializerRegistrar {
/**
* Registers all serializers with the given {@link Kryo}. All serializers are registered with
* specific IDs as a continuous block.
*
* @param kryo Kryo to register serializers with
*/
void registerSerializers(Kryo kryo);

/**
* Returns the registration ID that immediately follows the last registered serializer.
*
* @return registration ID that should be used for the next serializer registration
*/
int getNextRegistrationId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
Expand All @@ -67,6 +69,35 @@
* <p>This serializer is intended as a fallback serializer for the cases that are not covered by the
* basic types, tuples, and POJOs.
*
* <p>The set of serializers registered with Kryo via {@link Kryo#register}, with their respective
* IDs, depends on whether flink-java or flink-scala are on the classpath. This is for
* backwards-compatibility reasons.
*
* <p>If neither are available (which should only apply to tests in flink-core), then:
*
* <ul>
* <li>0-9 are used for Java primitives
* <li>10+ are used for user-defined registration
* </ul>
*
* <p>If flink-scala is available, then:
*
* <ul>
* <li>0-9 are used for Java primitives
* <li>10-72 are used for Scala classes
* <li>73-84 are used for Java classes
* <li>85+ are used for user-defined registration
* </ul>
*
* <p>If *only* flink-java is available, then:
*
* <ul>
* <li>0-9 are used for Java primitives
* <li>10-72 are unused (to maintain compatibility)
* <li>73-84 are used for Java classes
* <li>85+ are used for user-defined registration
* </ul>
*
* @param <T> The type to be serialized.
*/
public class KryoSerializer<T> extends TypeSerializer<T> {
Expand All @@ -86,6 +117,23 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
configureKryoLogging();
}

@Nullable
private static final ChillSerializerRegistrar flinkChillPackageRegistrar =
loadFlinkChillPackageRegistrar();

@Nullable
private static ChillSerializerRegistrar loadFlinkChillPackageRegistrar() {
try {
return (ChillSerializerRegistrar)
Class.forName(
"org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar")
.getDeclaredConstructor()
.newInstance();
} catch (Exception e) {
return null;
}
}

// ------------------------------------------------------------------------

private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>>
Expand Down Expand Up @@ -458,6 +506,10 @@ private Kryo getKryoInstance() {
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);

if (flinkChillPackageRegistrar != null) {
flinkChillPackageRegistrar.registerSerializers(kryo);
}

return kryo;
}
}
Expand Down Expand Up @@ -487,7 +539,12 @@ private void checkKryoInitialized() {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}

KryoUtils.applyRegistrations(this.kryo, kryoRegistrations.values());
KryoUtils.applyRegistrations(
this.kryo,
kryoRegistrations.values(),
flinkChillPackageRegistrar != null
? flinkChillPackageRegistrar.getNextRegistrationId()
: kryo.getNextRegistrationId());

kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@
* <p>All registrations use a hard-coded ID which were determined at commit
* 18f176ce86900fd4e932c73f3d138912355c6880.
*/
public class FlinkChillPackageRegistrar {
public class FlinkChillPackageRegistrar implements ChillSerializerRegistrar {

private static final int FIRST_REGISTRATION_ID = 73;

public static void registerJavaTypes(Kryo kryo) {
@Override
public int getNextRegistrationId() {
return 85;
}

@Override
public void registerSerializers(Kryo kryo) {
//noinspection ArraysAsListWithZeroOrOneArgument
new RegistrationHelper(FIRST_REGISTRATION_ID, kryo)
.register(Arrays.asList("").getClass(), new ArraysAsListSerializer())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import _root_.java.io.Serializable

import com.twitter.chill._

import org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -189,6 +190,6 @@ class AllScalaRegistrar extends IKryoRegistrar {
// use the singleton serializer for boxed Unit
val boxedUnit = scala.Unit.box(())
k.register(boxedUnit.getClass, new SingletonSerializer(boxedUnit))
FlinkChillPackageRegistrar.registerJavaTypes(k)
new FlinkChillPackageRegistrar().registerSerializers(k)
}
}

0 comments on commit 1bfeaba

Please sign in to comment.