Skip to content

Commit

Permalink
Add support for multi cluster query to lineage (finos#3093)
Browse files Browse the repository at this point in the history
  • Loading branch information
AFine-gs authored Sep 17, 2024
1 parent 7f8c776 commit d6ccbce
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,26 @@ function <<test.Test>> meta::analytics::lineage::test::relational::calendarAggre
,EmployeeMapping, meta::external::store::relational::tests::testRuntime(), relationalExtensions());
assertSameElements(['Lambda','tb_EmployeeDatabasedefaultEmployeeTable','db_EmployeeDatabase'], $r.databaseLineage.nodes.data.id);

}
}



###Pure
import meta::analytics::lineage::*;
import meta::relational::extension::*;
import meta::relational::metamodel::join::*;
import meta::relational::tests::tds::tdsJoin::*;
function <<meta::pure::profiles::test.Test>> meta::analytics::lineage::test::relational::testMultipleDB():Boolean[1]
{

let fn= {|testJoinTDS_Person.all()->meta::pure::tds::project([col(p|$p.firstName, 'firstName'), col(p|$p.employerID, 'eID'), col(p|$p.managerID, 'managerID')])
->join(testJoinTDS_Firm.all()->project([col(p|$p.firmID, 'fID'), col(p|$p.legalName, 'legalName')]), JoinType.INNER, {a,b|$a.getInteger('eID') == $b.getInteger('fID');});};


// meta::pure::executionPlan::executionPlan($fn,meta::relational::tests::tds::tdsJoin::testJoinTDSMappingTwoDatabaseWithColumnsMappedViaJoinsAndDynaFunction,meta::relational::tests::tds::tdsJoin::twoDBRunTime() ,relationalExtensions())->meta::pure::executionPlan::toString::planToString(relationalExtensions())->println();

let lineage = computeLineage($fn,meta::relational::tests::tds::tdsJoin::testJoinTDSMappingTwoDatabaseWithColumnsMappedViaJoinsAndDynaFunction, meta::relational::tests::tds::tdsJoin::twoDBRunTime(), relationalExtensions());
assertEquals(['Lambda', 'tb_dbIncdefaultpersonTable', 'tb_dbIncdefaultfirmTable', 'tb_database2defaultfirmTable', 'db_dbInc', 'db_database2'],$lineage.databaseLineage.nodes.data.id);

true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ function meta::relational::contract::planExecution(sq:meta::pure::mapping::Store
let dbConnectionStore = $oldRuntime.connectionStores->filter(c|$c.connection==$dbConn);
^$oldRuntime(connectionStores = $dbConnectionStore);
);

let queryExeCtx = if($exeCtx->instanceOf(RelationalExecutionContext),|$exeCtx,|[])->cast(@RelationalExecutionContext);
let originalQuery = $sq.fe->toSQLQuery($m, $sq.inScopeVars,$debug,$queryExeCtx->relationalExecutionContextToState(defaultState($m, $sq.inScopeVars, $exeCtx, $extensions)), $extensions);
$originalQuery->postProcessSQLQuery($store, $ext, $m, $storeRuntime, $exeCtx, $extensions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import meta::pure::executionPlan::*;
import meta::pure::router::externalFormat::metamodel::clustering::*;
import meta::pure::router::metamodel::clustering::*;
import meta::pure::router::platform::metamodel::clustering::*;
import meta::dsb::domain::config::*;
import meta::pure::extension::*;
import meta::relational::extension::*;
Expand Down Expand Up @@ -341,12 +345,16 @@ function meta::pure::lineage::scanRelations::scanRelations(f:FunctionDefinition<
$f->scanRelations($m, $r, $vars, noDebug(), $extensions);
}





function meta::pure::lineage::scanRelations::scanRelations(f:FunctionDefinition<Any>[1], m:Mapping[1], r:Runtime[1], vars:Pair<String, List<Any>>[*], debug: meta::pure::tools::DebugContext[1], extensions:Extension[*]):RelationTree[1]
{
let sqlQuery = meta::pure::lineage::scanRelations::generateSQLQuery($f, $m, $r, $vars, $extensions);
if($debug.debug, | print('Generated SQL Query - ' + sqlQueryToStringPretty($sqlQuery, DatabaseType.H2, 'GMT', [], $extensions) + '\n\n'), |[]);

let childTrees = generateRelationTreeFromRelationalOperationElement($sqlQuery, $debug, $extensions);

let childTrees = meta::pure::lineage::scanRelations::generatRelationalTrees($f, $m, $r, $vars, $debug, $extensions);


let rootTree = ^RelationTree
(
Expand All @@ -358,13 +366,54 @@ function meta::pure::lineage::scanRelations::scanRelations(f:FunctionDefinition<

$rootTree;
}
function meta::pure::lineage::scanRelations::collectStoreMappingCluster(cluster:ClusteredValueSpecification[1], inScopeVars:Map<String, List<Any>>[1], extensions:meta::pure::extension::Extension[*]):ClusterAndVars[*]
{
$cluster->match([
sc:StoreMappingClusteredValueSpecification[1] | let DBCluster = $sc->filter(cvs | $cvs.store->instanceOf(Database)); //TODO: Refactor to support additional store types
if($DBCluster->isNotEmpty() ,| ^ClusterAndVars(cluster=$DBCluster->toOne() ),|[]);,

pl:PlatformClusteredValueSpecification[1] | $pl.val->processPlatformValueSpecification($inScopeVars,$extensions); ,
a:Any[*]| fail('Found unsupported cluster type '+ $a->type().name->toOne()) ; [];
]);

}

function meta::pure::lineage::scanRelations::processPlatformValueSpecification(val:ValueSpecification[1], inScopeVars:Map<String, List<Any>>[1], extensions:meta::pure::extension::Extension[*]):ClusterAndVars[*]
{
$val->match([
fn:SimpleFunctionExpression[1] | if ($fn.func ==letFunction_String_1__T_m__T_m_ && $fn.parametersValues->at(1)->instanceOf(ClusteredValueSpecification), //TODO: This might need a more robust function processor for different functions if they become relevent
| let varName = $fn.parametersValues->at(0)->cast(@InstanceValue).values->toOne()->toString();
let value = $fn.parametersValues->at(1)->cast(@ClusteredValueSpecification)->collectStoreMappingCluster($inScopeVars,$extensions);
^ClusterAndVars(var=$varName,cluster=$value.cluster->last());
,
|[]
);,

a:Any[*]| [];
]);

}
Class meta::pure::lineage::scanRelations::ClusterAndVars
{
cluster:ClusteredValueSpecification[0..1];
var:String[0..1];

}

function <<access.private>> meta::pure::lineage::scanRelations::generateSQLQuery(f:FunctionDefinition<Any>[1], m:Mapping[1], r:Runtime[1], vars:Pair<String, List<Any>>[*], extensions:Extension[*]):SQLQuery[1]
Class meta::pure::lineage::scanRelations::RelationalTreeAndVars
{
let routed = $f->routeFunction($m, $r, $extensions, noDebug());
tree:RelationTree[*];
vars:Map<String, List<Any>>[1];

}

function <<access.private>> meta::pure::lineage::scanRelations::generatRelationalTrees(f:FunctionDefinition<Any>[1], m:Mapping[1], r:Runtime[1], vars:Pair<String, List<Any>>[*], debug:DebugContext[1], extensions:Extension[*]):RelationTree[*]
{
let routed = $f->routeFunction($m, $r, $extensions, $debug);
let routedFunction = $routed->evaluateAndDeactivate()->toOne();
let inScopeVars = $f.expressionSequence->evaluateAndDeactivate()->fold({vs, a |

let inScopeVars = $f.expressionSequence->evaluateAndDeactivate()->fold({vs, a | if ($vs->isLetFunction(),
if ($vs->isLetFunction(),
| let varName = $vs->meta::pure::router::utils::extractLetVariableName();
let varExprs = $vs->findVariableExpressionsInValueSpecification();

Expand All @@ -379,12 +428,25 @@ function <<access.private>> meta::pure::lineage::scanRelations::generateSQLQuery
);
}, $f->openVariableValues()->putAll($vars));

let clusters = $routedFunction.expressionSequence->evaluateAndDeactivate()->filter(c | $c->instanceOf(StoreMappingClusteredValueSpecification))->cast(@StoreMappingClusteredValueSpecification)->filter(cvs | $cvs.store->instanceOf(Database));
assert($clusters->size() == 1, | 'Scan Relations currently supports 1 cluster, Found - ' + $clusters->size()->toString());

let fe = $clusters->toOne().val->byPassValueSpecificationWrapper()->cast(@FunctionExpression);

let allClusters = $routedFunction.expressionSequence->evaluateAndDeactivate()->cast(@ClusteredValueSpecification)->map(c|$c-> meta::pure::lineage::scanRelations::collectStoreMappingCluster($inScopeVars,$extensions) );
let AllTrees = $allClusters->fold({ v, a| let fn = $v.cluster->toOne().val->byPassValueSpecificationWrapper()->cast(@FunctionExpression);
let context = ^RelationalExecutionContext();
let SQL = $v.cluster->toOne().val->byPassValueSpecificationWrapper()->cast(@FunctionExpression)->toSQLQuery($m, $a.vars, $context, noDebug(), $extensions) ;
if($debug.debug, | print('Generated SQL Query - ' + sqlQueryToStringPretty($SQL, DatabaseType.H2, 'GMT', [], $extensions) + '\n\n'), |[]);

let possibleClass = $fn.genericType.rawType->toOne();
let updatedVars = if($v.var->isNotEmpty(),
| let class = if ($possibleClass == Any, | let getAllClass = findMainClassInGetAllExpression($fn); if($getAllClass->isEmpty(), | $possibleClass, | $getAllClass);, | $possibleClass );
let tdsType = $class->meta::relational::mapping::TDSSelectQueryToTDSResultType($SQL->cast(@TdsSelectSqlQuery),$context);
let varName = $v.var->toOne();
let varset = ^PlanSetPlaceHolder(name=$varName,tdsColumns = $tdsType->match([x:TDSResultType[1]|$x.tdsColumns, a:Any[1]|[]]));
$inScopeVars->put($varName,^List<Any>(values=$varset));,
|$a.vars);
let updatedTree= generateRelationTreeFromRelationalOperationElement($SQL, noDebug(), $extensions)->concatenate($a.tree);
^RelationalTreeAndVars(tree= $updatedTree, vars=$updatedVars);
} , ^RelationalTreeAndVars(vars=$inScopeVars)).tree;

let sqlQuery = $fe->toSQLQuery($m, $inScopeVars, ^RelationalExecutionContext(), noDebug(), $extensions);
}

function meta::pure::lineage::scanRelations::generateRelationTreeFromRelationalOperationElement(relOp:RelationalOperationElement[1], debug: meta::pure::tools::DebugContext[1], extensions:Extension[*]):RelationTree[*]
Expand Down
Loading

0 comments on commit d6ccbce

Please sign in to comment.