Skip to content

Commit

Permalink
feat(plugins): spring custom plugins (datahub-project#10389)
Browse files Browse the repository at this point in the history
Co-authored-by: Kevin Chun <[email protected]>
Co-authored-by: Kevin Chun <[email protected]>
  • Loading branch information
3 people authored May 9, 2024
1 parent 162b6e9 commit 6ed21bd
Show file tree
Hide file tree
Showing 78 changed files with 1,494 additions and 553 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ project.ext.externalDependency = [
'jetbrains':' org.jetbrains.kotlin:kotlin-stdlib:1.6.0',
'annotationApi': 'javax.annotation:javax.annotation-api:1.3.2',
'jakartaAnnotationApi': 'jakarta.annotation:jakarta.annotation-api:3.0.0',
'classGraph': 'io.github.classgraph:classgraph:4.8.168',
'classGraph': 'io.github.classgraph:classgraph:4.8.172',
]

allprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import com.linkedin.datahub.graphql.generated.SearchSuggestion;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.metadata.entity.validation.ValidationUtils;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.utils.SearchUtils;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -89,7 +89,7 @@ public static List<MatchedField> getMatchedFieldEntry(
if (SearchUtils.isUrn(field.getValue())) {
try {
Urn urn = Urn.createFromString(field.getValue());
ValidationUtils.validateUrn(
ValidationApiUtils.validateUrn(
context.getOperationContext().getEntityRegistry(), urn);
matchedField.setEntity(UrnToEntityMapper.map(context, urn));
} catch (IllegalArgumentException | URISyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.metadata.entity.validation.ValidationUtils;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
Expand Down Expand Up @@ -42,7 +42,7 @@ public void testMatchedFieldValidation() throws URISyntaxException {
"urn:li:dataset:%28urn:li:dataPlatform:s3%2Ctest-datalake-concepts/prog_maintenance%2CPROD%29");
assertThrows(
IllegalArgumentException.class,
() -> ValidationUtils.validateUrn(entityRegistry, invalidUrn));
() -> ValidationApiUtils.validateUrn(entityRegistry, invalidUrn));

QueryContext mockContext = mock(QueryContext.class);
when(mockContext.getOperationContext())
Expand Down
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
### Breaking Changes

- #10419 - `aws_region` is now a required configuration in the DynamoDB connector. The connector will no longer loop through all AWS regions; instead, it will only use the region passed into the recipe configuration.
- #10389 - Custom validators, mutators, side-effects dropped a previously required constructor
- #10472 - `RVW` added as a FabricType. No rollbacks allowed once metadata with this fabric type is added without manual cleanups in databases.

### Potential Downtime
Expand Down
2 changes: 1 addition & 1 deletion entity-registry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies {
implementation externalDependency.jacksonDataFormatYaml
implementation externalDependency.reflections

implementation externalDependency.jsonPatch
api externalDependency.jsonPatch
implementation externalDependency.jsonPathImpl

constraints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

@Getter
@Setter
@Accessors(chain = true)
public class StructuredPropertiesSoftDelete extends MutationHook {
public StructuredPropertiesSoftDelete(AspectPluginConfig aspectPluginConfig) {
super(aspectPluginConfig);
}
@Nonnull private AspectPluginConfig config;

@Override
protected Stream<Pair<ReadItem, Boolean>> readMutation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -24,21 +26,13 @@
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;

@Slf4j
public class PluginFactory {

private static final String[] VALIDATOR_PACKAGES = {
"com.linkedin.metadata.aspect.plugins.validation", "com.linkedin.metadata.aspect.validation"
};
private static final String[] HOOK_PACKAGES = {
"com.linkedin.metadata.aspect.plugins.hooks", "com.linkedin.metadata.aspect.hooks"
};

public static PluginFactory withCustomClasspath(
@Nullable PluginConfiguration pluginConfiguration, @Nonnull List<ClassLoader> classLoaders) {
return new PluginFactory(pluginConfiguration, classLoaders);
return new PluginFactory(pluginConfiguration, classLoaders).loadPlugins();
}

public static PluginFactory withConfig(@Nullable PluginConfiguration pluginConfiguration) {
Expand All @@ -49,44 +43,135 @@ public static PluginFactory empty() {
return PluginFactory.withConfig(PluginConfiguration.EMPTY);
}

public static PluginFactory merge(PluginFactory a, PluginFactory b) {
return PluginFactory.withCustomClasspath(
PluginConfiguration.merge(a.getPluginConfiguration(), b.getPluginConfiguration()),
public static PluginFactory merge(
PluginFactory a,
PluginFactory b,
@Nullable
BiFunction<PluginConfiguration, List<ClassLoader>, PluginFactory> pluginFactoryProvider) {
PluginConfiguration mergedPluginConfig =
PluginConfiguration.merge(a.pluginConfiguration, b.pluginConfiguration);
List<ClassLoader> mergedClassLoaders =
Stream.concat(a.getClassLoaders().stream(), b.getClassLoaders().stream())
.collect(Collectors.toList()));
.collect(Collectors.toList());

if (pluginFactoryProvider != null) {
return pluginFactoryProvider.apply(mergedPluginConfig, mergedClassLoaders);
} else {
return PluginFactory.withCustomClasspath(mergedPluginConfig, mergedClassLoaders);
}
}

@Getter private final PluginConfiguration pluginConfiguration;
@Nonnull @Getter private final List<ClassLoader> classLoaders;
@Getter private final List<AspectPayloadValidator> aspectPayloadValidators;
@Getter private final List<MutationHook> mutationHooks;
@Getter private final List<MCLSideEffect> mclSideEffects;
@Getter private final List<MCPSideEffect> mcpSideEffects;
@Getter private List<AspectPayloadValidator> aspectPayloadValidators;
@Getter private List<MutationHook> mutationHooks;
@Getter private List<MCLSideEffect> mclSideEffects;
@Getter private List<MCPSideEffect> mcpSideEffects;

private final ClassGraph classGraph;
private static final Map<Long, List<PluginSpec>> pluginCache = new ConcurrentHashMap<>();

public PluginFactory(
@Nullable PluginConfiguration pluginConfiguration, @Nonnull List<ClassLoader> classLoaders) {
this.classGraph =
new ClassGraph()
.acceptPackages(ArrayUtils.addAll(HOOK_PACKAGES, VALIDATOR_PACKAGES))
.enableRemoteJarScanning()
.enableExternalClasses()
.enableClassInfo()
.enableMethodInfo();

this.classLoaders = classLoaders;

if (!this.classLoaders.isEmpty()) {
classLoaders.forEach(this.classGraph::addClassLoader);
}

this.pluginConfiguration =
pluginConfiguration == null ? PluginConfiguration.EMPTY : pluginConfiguration;
}

public PluginFactory loadPlugins() {
this.aspectPayloadValidators = buildAspectPayloadValidators(this.pluginConfiguration);
this.mutationHooks = buildMutationHooks(this.pluginConfiguration);
this.mclSideEffects = buildMCLSideEffects(this.pluginConfiguration);
this.mcpSideEffects = buildMCPSideEffects(this.pluginConfiguration);
return this;
}

/**
* Memory intensive operation because of the size of the jars. Limit packages, classes scanned,
* cache results
*
* @param configs plugin configurations
* @return auto-closeable scan result
*/
protected static <T extends PluginSpec> List<T> initPlugins(
@Nonnull List<ClassLoader> classLoaders,
@Nonnull Class<?> baseClazz,
@Nonnull List<String> packageNames,
@Nonnull List<AspectPluginConfig> configs) {

List<String> classNames =
configs.stream().map(AspectPluginConfig::getClassName).collect(Collectors.toList());

if (classNames.isEmpty()) {
return Collections.emptyList();
} else {
long key =
IntStream.concat(
classLoaders.stream().mapToInt(Object::hashCode),
IntStream.concat(
IntStream.of(baseClazz.getName().hashCode()),
configs.stream().mapToInt(AspectPluginConfig::hashCode)))
.sum();

return (List<T>)
pluginCache.computeIfAbsent(
key,
k -> {
try {
ClassGraph classGraph =
new ClassGraph()
.acceptPackages(packageNames.stream().distinct().toArray(String[]::new))
.acceptClasses(classNames.stream().distinct().toArray(String[]::new))
.enableRemoteJarScanning()
.enableExternalClasses()
.enableClassInfo()
.enableMethodInfo();
if (!classLoaders.isEmpty()) {
classLoaders.forEach(classGraph::addClassLoader);
}

try (ScanResult scanResult = classGraph.scan()) {
Map<String, ClassInfo> classMap =
scanResult.getSubclasses(baseClazz).stream()
.collect(Collectors.toMap(ClassInfo::getName, Function.identity()));

return configs.stream()
.map(
config -> {
try {
ClassInfo classInfo = classMap.get(config.getClassName());
if (classInfo == null) {
throw new IllegalStateException(
String.format(
"The following class cannot be loaded: %s",
config.getClassName()));
}
MethodInfo constructorMethod =
classInfo.getConstructorInfo().get(0);
return ((T)
constructorMethod
.loadClassAndGetConstructor()
.newInstance())
.setConfig(config);
} catch (Exception e) {
log.error(
"Error constructing entity registry plugin class: {}",
config.getClassName(),
e);
return Stream.<T>empty();
}
})
.map(plugin -> (T) plugin)
.filter(PluginSpec::enabled)
.collect(Collectors.toList());
}
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(
"Failed to load entity registry plugins: %s.", baseClazz.getName()),
e);
}
});
}
}

/**
Expand Down Expand Up @@ -187,68 +272,67 @@ private List<AspectPayloadValidator> buildAspectPayloadValidators(
: applyDisable(
build(
AspectPayloadValidator.class,
pluginConfiguration.getAspectPayloadValidators(),
VALIDATOR_PACKAGES));
pluginConfiguration.validatorPackages(),
pluginConfiguration.getAspectPayloadValidators()));
}

private List<MutationHook> buildMutationHooks(@Nullable PluginConfiguration pluginConfiguration) {
return pluginConfiguration == null
? Collections.emptyList()
: applyDisable(
build(MutationHook.class, pluginConfiguration.getMutationHooks(), HOOK_PACKAGES));
build(
MutationHook.class,
pluginConfiguration.mutationPackages(),
pluginConfiguration.getMutationHooks()));
}

private List<MCLSideEffect> buildMCLSideEffects(
@Nullable PluginConfiguration pluginConfiguration) {
return pluginConfiguration == null
? Collections.emptyList()
: applyDisable(
build(MCLSideEffect.class, pluginConfiguration.getMclSideEffects(), HOOK_PACKAGES));
build(
MCLSideEffect.class,
pluginConfiguration.mclSideEffectPackages(),
pluginConfiguration.getMclSideEffects()));
}

private List<MCPSideEffect> buildMCPSideEffects(
@Nullable PluginConfiguration pluginConfiguration) {
return pluginConfiguration == null
? Collections.emptyList()
: applyDisable(
build(MCPSideEffect.class, pluginConfiguration.getMcpSideEffects(), HOOK_PACKAGES));
build(
MCPSideEffect.class,
pluginConfiguration.mcpSideEffectPackages(),
pluginConfiguration.getMcpSideEffects()));
}

private <T> List<T> build(
Class<?> baseClazz, List<AspectPluginConfig> configs, String... packageNames) {
try (ScanResult scanResult = classGraph.acceptPackages(packageNames).scan()) {

Map<String, ClassInfo> classMap =
scanResult.getSubclasses(baseClazz).stream()
.collect(Collectors.toMap(ClassInfo::getName, Function.identity()));

return configs.stream()
.flatMap(
config -> {
try {
ClassInfo classInfo = classMap.get(config.getClassName());
if (classInfo == null) {
throw new IllegalStateException(
String.format(
"The following class cannot be loaded: %s", config.getClassName()));
}
MethodInfo constructorMethod = classInfo.getConstructorInfo().get(0);
return Stream.of(
(T) constructorMethod.loadClassAndGetConstructor().newInstance(config));
} catch (Exception e) {
log.error(
"Error constructing entity registry plugin class: {}",
config.getClassName(),
e);
return Stream.empty();
}
})
.collect(Collectors.toList());
/**
* Load plugins given the base class (i.e. a validator) and the name of the implementing class
* found in the configuration objects.
*
* <p>For performance reasons, scan the packages found in packageNames
*
* <p>Designed to avoid any Spring dependency, see alternative implementation for Spring
*
* @param baseClazz base class for the plugin
* @param configs configuration with implementing class information
* @param packageNames package names to scan
* @return list of plugin instances
* @param <T> the plugin class
*/
protected <T extends PluginSpec> List<T> build(
Class<?> baseClazz, List<String> packageNames, List<AspectPluginConfig> configs) {
List<AspectPluginConfig> nonSpringConfigs =
configs.stream()
.filter(
config ->
config.getSpring() == null
|| Boolean.FALSE.equals(config.getSpring().isEnabled()))
.collect(Collectors.toList());

} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Failed to load entity registry plugins: %s.", baseClazz.getName()), e);
}
return initPlugins(classLoaders, baseClazz, packageNames, nonSpringConfigs);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
public abstract class PluginSpec {
protected static String ENTITY_WILDCARD = "*";

private final AspectPluginConfig aspectPluginConfig;
@Nonnull
public abstract AspectPluginConfig getConfig();

protected AspectPluginConfig getConfig() {
return this.aspectPluginConfig;
public abstract PluginSpec setConfig(@Nonnull AspectPluginConfig config);

public boolean enabled() {
return true;
}

public boolean shouldApply(
Expand Down
Loading

0 comments on commit 6ed21bd

Please sign in to comment.