Skip to content

Commit

Permalink
Merge pull request #108 from OsgiliathEnterprise/embeddedid
Browse files Browse the repository at this point in the history
Qualified - rework graph clustering
  • Loading branch information
Tcharl authored Oct 15, 2024
2 parents 799f91e + 93547a3 commit 52e1b10
Show file tree
Hide file tree
Showing 27 changed files with 4,990 additions and 544 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,20 @@
import org.jgrapht.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static net.osgiliath.migrator.core.configuration.DataSourceConfiguration.SINK_TRANSACTION_MANAGER;
import static org.apache.tinkerpop.gremlin.process.traversal.P.eq;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.*;

Expand All @@ -54,9 +60,12 @@ public class SinkEntityInjector {
private final MetamodelRequester metamodelGraphRequester;
private final ModelElementProcessor modelElementProcessor;
private final VertexResolver vertexResolver;
private final PlatformTransactionManager sinkPlatformTxManager;

public SinkEntityInjector(VertexPersister vertexPersister, MetamodelRequester metamodelGraphRequester, ModelElementProcessor modelElementProcessor, VertexResolver vertexResolver) {

public SinkEntityInjector(VertexPersister vertexPersister, MetamodelRequester metamodelGraphRequester, ModelElementProcessor modelElementProcessor, VertexResolver vertexResolver, @Qualifier(SINK_TRANSACTION_MANAGER) PlatformTransactionManager sinkPlatformTxManager) {
super();
this.sinkPlatformTxManager = sinkPlatformTxManager;
this.vertexPersister = vertexPersister;
this.metamodelGraphRequester = metamodelGraphRequester;
this.modelElementProcessor = modelElementProcessor;
Expand All @@ -70,53 +79,89 @@ public void persist(GraphTraversalSource modelGraph, Graph<MetamodelVertex, Fiel
}

private void processEntitiesWithoutCycles(GraphTraversalSource modelGraph, Graph<MetamodelVertex, FieldEdge<MetamodelVertex>> entityMetamodelGraph, Collection<Vertex> processedVertices) {
GraphTraversal leafElements = modelGraph.V()
.repeat(out())
.until(
out().filter(not(is(P.within(processedVertices)))).count().is(0)// .or().loops().is(CYCLE_DETECTION_DEPTH)
)
.filter(not(is(P.within(processedVertices))));
if (!leafElements.hasNext()) {
vertexPersister.persistVertices(modelGraph.V().filter(not(is(P.within(processedVertices)))).toStream()
.map(modelVertex -> vertexResolver.getModelElement(modelVertex)));
return;
GraphTraversal orphansElements = modelGraph.V().filter(not(is(P.within(processedVertices)))).filter(out().count().is(0));
if (orphansElements.hasNext()) {
log.debug("persisting orphans elements");
persistTraversal(entityMetamodelGraph, processedVertices, orphansElements.toStream());
processEntitiesWithoutCycles(modelGraph, entityMetamodelGraph, processedVertices);
} else {
GraphTraversal leafElements = modelGraph
.V()
.repeat(out())
.until(
out().filter(not(is(P.within(processedVertices)))).count().is(0)
).filter(not(is(P.within(processedVertices)))).dedup();
if (leafElements.hasNext()) {
log.debug("persisting leaf elements");
persistTraversal(entityMetamodelGraph, processedVertices, leafElements.toStream());
processEntitiesWithoutCycles(modelGraph, entityMetamodelGraph, processedVertices);
} else {
log.debug("persisting related elements");
Stream stream = modelGraph.V().filter(not(is(P.within(processedVertices)))).toStream();
Collection set = (Collection) stream.collect(Collectors.toSet());
for (var o : set)
persistTraversal(entityMetamodelGraph, processedVertices, Optional.of(o).stream());
}
}
Stream<Vertex> res = leafElements.toStream()
}

private void persistTraversal(Graph<MetamodelVertex, FieldEdge<MetamodelVertex>> entityMetamodelGraph, Collection<Vertex> processedVertices, Stream traversal) {
Stream<ModelElement> res = traversal
.map(m -> {
Vertex v = (Vertex) m;
processedVertices.add(v);
return v;
})
.map(e -> {
Vertex modelVertex = (Vertex) e;
updateRawElementRelationshipsAccordingToGraphEdges(modelVertex, entityMetamodelGraph);
return modelVertex;
})
.peek(mv -> {
Vertex tv = (Vertex) mv;
log.info("Persisting vertex of type {} with id {}", tv.label(), vertexResolver.getVertexModelElementId(tv));
});
vertexPersister.persistVertices(res
.map(modelVertex -> {
processedVertices.add(modelVertex);
return modelVertex;
return updateRawElementRelationshipsAccordingToGraphEdges(modelVertex, entityMetamodelGraph);
})
.map(modelVertex -> vertexResolver.getModelElement(modelVertex)));
processEntitiesWithoutCycles(modelGraph, entityMetamodelGraph, processedVertices);
.peek(me -> {
ModelElement mv = (ModelElement) me;
log.info("Persisting vertex of type {} with id {}", mv.vertex().getTypeName(), modelElementProcessor.getId(mv).get());
}).filter(
me -> {
ModelElement mv = (ModelElement) me;
Optional<Object> id = modelElementProcessor.getId(mv);
return id.isPresent() && id.get() != null &&
((id.get() instanceof Long && 0L != (Long) id.get()) ||
(id.get() instanceof String && !((String) id.get()).isEmpty()) ||
id.get().getClass().getAnnotation(jakarta.persistence.Embeddable.class) != null)
;
}
);
TransactionTemplate tpl = new TransactionTemplate(sinkPlatformTxManager);
try {
tpl.executeWithoutResult(
act -> vertexPersister.persistVertices(res)
);
} catch (Exception e) {
log.error("Unable to save one element", e);
}
}

void updateRawElementRelationshipsAccordingToGraphEdges(Vertex sourceVertex, Graph<MetamodelVertex, FieldEdge<MetamodelVertex>> entityMetamodelGraph) {
ModelElement sourceModelElement = vertexResolver.getModelElement(sourceVertex);
ModelElement updateRawElementRelationshipsAccordingToGraphEdges(Vertex sourceVertex, Graph<MetamodelVertex, FieldEdge<MetamodelVertex>> entityMetamodelGraph) {
ModelElement sourceModelElement = modelElementProcessor.unproxy(vertexResolver.getModelElement(sourceVertex));
modelElementProcessor.resetModelElementEdge(sourceModelElement);

MetamodelVertex sourceMetamodelVertex = vertexResolver.getMetamodelVertex(sourceVertex);
metamodelGraphRequester.getOutboundFieldEdges(sourceMetamodelVertex, entityMetamodelGraph).stream()
.map(metamodelEdge -> {
modelElementProcessor.resetModelElementEdge(metamodelEdge, sourceModelElement);
return metamodelEdge;
})
.flatMap(metamodelEdge ->
StreamSupport.stream(Spliterators.spliteratorUnknownSize(sourceVertex.edges(Direction.OUT, metamodelEdge.getFieldName()), 0), false)
.map(modelEdge -> new ModelAndMetamodelEdge(modelEdge, metamodelEdge))
)
.peek(modelAndMetamodelEdge -> log.info("Recomposing edge: {} between source vertex of type {} with id {} and target vertex of type {} and id {}", modelAndMetamodelEdge.modelEdge().label(), sourceVertex.label(), vertexResolver.getVertexModelElementId(sourceVertex), modelAndMetamodelEdge.modelEdge().inVertex().label(), vertexResolver.getVertexModelElementId(modelAndMetamodelEdge.modelEdge().inVertex())))
.forEach(modelAndMetamodelEdge -> {
ModelElement targetModelElement = vertexResolver.getModelElement(modelAndMetamodelEdge.modelEdge().inVertex());
modelElementProcessor.addRawElementsRelationshipForEdge(modelAndMetamodelEdge.metamodelEdge(), sourceModelElement, targetModelElement, entityMetamodelGraph);
ModelElement targetModelElement = modelElementProcessor.unproxy(vertexResolver.getModelElement(modelAndMetamodelEdge.modelEdge().inVertex()));
modelElementProcessor.resetModelElementEdge(targetModelElement);
Optional<ModelElement> targetModelElementInTarget = vertexPersister.reattachEntityInSink(targetModelElement);
targetModelElementInTarget.ifPresentOrElse(tgt ->
modelElementProcessor.addRawElementsRelationshipForEdge(modelAndMetamodelEdge.metamodelEdge(), sourceModelElement, tgt),
() -> log.error("Cannot find target element to add relationship to. SourceType: {}, SourceId: {}, Edge: {}, TargetType: {}, targetId: {}", vertexResolver.getMetamodelVertex(sourceVertex).getTypeName(), vertexResolver.getVertexModelElementId(sourceVertex), modelAndMetamodelEdge.metamodelEdge().getFieldName(), targetModelElement.vertex().getTypeName(), modelElementProcessor.getId(targetModelElement))

);
});
return sourceModelElement;
}


Expand All @@ -125,7 +170,19 @@ private void removeCyclicElements(GraphTraversalSource modelGraph) {
.repeat(out())
.until(where(eq("a"))
.or().loops().is(CYCLE_DETECTION_DEPTH))
.filter(where(eq("a"))).drop().iterate();
.filter(where(eq("a")))
.filter(t -> {
Vertex v = t.get();
log.warn("Cyclic element of type {} with id {} found in the graph", v.label(), vertexResolver.getVertexModelElementId(v));
return true;
})
/* .inE()
.filter(e -> {
Edge v = e.get();
log.warn("Will remove the problematic incoming edge {} which creates cycle between {} with id {} and {} with id {}", v.label(), v.inVertex().label(), vertexResolver.getVertexModelElementId(v.inVertex()), v.outVertex().label(), vertexResolver.getVertexModelElementId(v.outVertex()));
return true;
})*/
.drop().iterate();
/* cyclicElements.toStream()
.peek(v -> {
Vertex ve = (Vertex) v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import jakarta.persistence.EntityManagerFactory;
import net.osgiliath.migrator.core.api.model.ModelElement;
import net.osgiliath.migrator.core.configuration.DataMigratorConfiguration;
import net.osgiliath.migrator.core.configuration.model.GraphDatasourceType;
import net.osgiliath.migrator.core.graph.ModelElementProcessor;
import net.osgiliath.migrator.core.metamodel.impl.internal.jpa.model.JpaMetamodelVertex;
import net.osgiliath.migrator.core.rawelement.RawElementProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -36,6 +35,7 @@
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.Optional;
import java.util.stream.Stream;

import static net.osgiliath.migrator.core.configuration.DataSourceConfiguration.SINK_TRANSACTION_MANAGER;
Expand All @@ -44,59 +44,46 @@
@Component
public class VertexPersister {

private final boolean reconcile;
private final PlatformTransactionManager sinkPlatformTxManager;
private final PlatformTransactionManager sourcePlatformTxManager;
private final RawElementProcessor rawElementProcessor;
private final ModelElementProcessor modelElementProcessor;
private static Logger log = LoggerFactory.getLogger(VertexPersister.class);

public VertexPersister(DataMigratorConfiguration dmc, RawElementProcessor rawElementProcessor, @Qualifier(SINK_TRANSACTION_MANAGER) PlatformTransactionManager sinkPlatformTxManager, @Qualifier(SOURCE_TRANSACTION_MANAGER) PlatformTransactionManager sourcePlatformTxManager) {
this.rawElementProcessor = rawElementProcessor;
this.reconcile = dmc.getGraphDatasource().getType().equals(GraphDatasourceType.REMOTE);
public VertexPersister(DataMigratorConfiguration dmc, ModelElementProcessor modelElementProcessor, @Qualifier(SINK_TRANSACTION_MANAGER) PlatformTransactionManager sinkPlatformTxManager, @Qualifier(SOURCE_TRANSACTION_MANAGER) PlatformTransactionManager sourcePlatformTxManager) {
this.modelElementProcessor = modelElementProcessor;
this.sinkPlatformTxManager = sinkPlatformTxManager;
this.sourcePlatformTxManager = sourcePlatformTxManager;
}

public void persistVertices(Stream<ModelElement> entities) {
Stream<?> reattachedEntities = reattachEntities(entities);
TransactionTemplate tpl = new TransactionTemplate(sinkPlatformTxManager);
// Stream<?> reattachedEntities = reattachEntities(entities);
//TransactionTemplate tpl = new TransactionTemplate(sinkPlatformTxManager);
JpaTransactionManager tm = (JpaTransactionManager) sinkPlatformTxManager;
EntityManagerFactory emf = tm.getEntityManagerFactory();
log.info("******************** Persisting a batch of entities ****************");
try {
tpl.executeWithoutResult(res -> {
EntityManager em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf);
reattachedEntities
.forEach((ent) -> {
log.debug("persisting entity of type {}, with id {}", ent.getClass(), rawElementProcessor.getId(null, ent).get());
em.merge(ent);
}
);
EntityManager em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf);
entities
.forEach((ent) -> {
log.debug("Persisting entity of type {}, with id {}", ent.rawElement(), modelElementProcessor.getId(ent).get());
em.merge(ent.rawElement());
}
);

});
} catch (Exception e) {
log.error("******************** ERROR Persisting last batch of entities ****************");
log.warn("Unable to persist last batch of entities, you may retry once", e);
log.error("******************** End error ****************");
}
}

private Stream<?> reattachEntities(Stream<ModelElement> entities) {
if (reconcile) {
JpaTransactionManager tm = (JpaTransactionManager) sourcePlatformTxManager;
EntityManagerFactory emf = tm.getEntityManagerFactory();
TransactionTemplate tpl = new TransactionTemplate(sourcePlatformTxManager);
tpl.setReadOnly(true);
return tpl.execute(status -> {
EntityManager em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf);
return entities.flatMap(me ->
rawElementProcessor.getId(me).map(
id -> em.find(((JpaMetamodelVertex) me.vertex()).entityClass(), id)
).stream()
Optional<ModelElement> reattachEntityInSink(ModelElement entity) {
JpaTransactionManager tm = (JpaTransactionManager) sinkPlatformTxManager;
EntityManagerFactory emf = tm.getEntityManagerFactory();
TransactionTemplate tpl = new TransactionTemplate(sinkPlatformTxManager);
tpl.setReadOnly(true);
EntityManager em = EntityManagerFactoryUtils.getTransactionalEntityManager(emf);
return
modelElementProcessor.getId(entity).map(
id -> new ModelElement(entity.vertex(), em.find(((JpaMetamodelVertex) entity.vertex()).entityClass(), id))
);
});
} else {
return entities.map(ModelElement::rawElement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ public GraphTraversal setVertexModelElementId(GraphTraversal traversal, Object i

@Override
public Object getVertexModelElementId(Vertex vtx) {
return vtx.value(MODEL_GRAPH_VERTEX_ENTITY_ID);
try {
return vtx.value(MODEL_GRAPH_VERTEX_ENTITY_ID);
} catch (Exception e) {
return "";
}
}

@Override
Expand Down
Loading

0 comments on commit 52e1b10

Please sign in to comment.