Skip to content

Commit

Permalink
ES support for distinct and restrict functions
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelbey committed Nov 23, 2023
1 parent 85fef69 commit a4c74f8
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,9 @@ function
{doc.doc = 'Remove duplicate rows from the priovded TDS'}
meta::pure::tds::distinct(tds:TabularDataSet[1]):TabularDataSet[1]
{
fail('Not implemented yet!');
$tds;
let newTds = ^$tds(rows = []);
let distinctRows = $tds.rows->removeDuplicates({l, r | $l.values == $r.values})->map(r | ^$r(parent = $newTds));
$newTds->mutateAdd('rows', $distinctRows);
}

function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@
</dependency>


<dependency>
<groupId>org.finos.legend.engine</groupId>
<artifactId>legend-engine-xt-elasticsearch-executionPlan-test</artifactId>
<scope>runtime</scope>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.smoketurner</groupId>
<artifactId>dropwizard-swagger</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected MutableList<RepositoryCodeStorage> buildRepositories(SourceLocationCon
.with(this.buildCore("legend-engine-xts-sql/legend-engine-xt-sql-pure", "external-query-sql"))
.with(this.buildCore("legend-engine-xts-authentication/legend-engine-xt-authentication-pure", "authentication"))
.with(this.buildCore("legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-pure-specification-metamodel", "elasticsearch_specification_metamodel"))
.with(this.buildCore("legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-executionPlan-test", "elasticsearch_execution_test"))
.with(this.buildCore("legend-engine-xts-elasticsearch/legend-engine-xt-elasticsearch-V7-pure-metamodel", "elasticsearch_seven_metamodel"))
.with(this.buildCore("legend-engine-xts-mongodb/legend-engine-xt-nonrelationalStore-mongodb-pure","nonrelational-mongodb"))
.with(this.buildCore("legend-engine-xts-mongodb/legend-engine-xt-nonrelationalStore-mongodb-javaPlatformBinding-pure","nonrelational-mongodb-java-platform-binding"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ Class meta::external::store::elasticsearch::v7::pureToEs::TDSESDetail
path(){
$this.resultPath.path()
}:String[1];
expression: FunctionExpression[0..1];
format: String[0..1];
}

Expand Down Expand Up @@ -124,9 +123,9 @@ function meta::external::store::elasticsearch::v7::pureToEs::resultPathToQuery(t
let srps = $tdsDetails.resultPath->concatenate($frps.readFrom)->filter(x | $x->instanceOf(SourceFieldResultPath))->cast(@SourceFieldResultPath);
let docps = $tdsDetails.resultPath->concatenate($frps.readFrom)->filter(x | $x->instanceOf(DocValueResultPath))->cast(@DocValueResultPath);

let seachWithSrps = $srps->isNotEmpty()->if(| ^$search(_source = ^SourceConfig(filter = ^SourceFilter(includes = $srps.path()->literal()))), | $search);
let searchWithSrps = $srps->isNotEmpty()->if(| ^$search(_source = ^SourceConfig(filter = ^SourceFilter(includes = $srps.path()->literal()))), | $search);
let fields = $docps->map(x | $x->resultPathToFieldAndFormat($tdsDetails->filter(t | $t.resultPath == $x)->first()));
$fields->isEmpty()->if(|$seachWithSrps, |^$seachWithSrps(docvalue_fields = $fields));
$fields->isEmpty()->if(|$searchWithSrps, |^$searchWithSrps(docvalue_fields = $fields));
}

function meta::external::store::elasticsearch::v7::pureToEs::resultPathToFieldAndFormat(resultPath: DocValueResultPath[1], tdsEsDetail: TDSESDetail[0..1]): FieldAndFormat[1]
Expand All @@ -135,6 +134,29 @@ function meta::external::store::elasticsearch::v7::pureToEs::resultPathToFieldAn
^FieldAndFormat(field = $resultPath.path()->literal(), format = $format->literal());
}

function meta::external::store::elasticsearch::v7::pureToEs::processRestrict(vs : FunctionExpression[1], initReq: State[1]): State[1]
{
$initReq.debug(|'Processing ->restrict');
let currReq = process($vs.parametersValues->at(0), $initReq);
let projectColNames = $vs->instanceValuesAtParameter(1, $currReq.sq.inScopeVars)->cast(@String);
let toRestrict = $projectColNames->map(s | $currReq.tdsESDetails->filter(x | $x.name == $s));
assert($toRestrict->size() == $projectColNames->size(), | 'restricting by unknown columns: ' + $projectColNames->removeAll($toRestrict.name)->joinStrings('[', ', ', ']'));

let search = $currReq.search;
^$currReq(
search = resultPathToQuery($toRestrict, $search),
tdsESDetails = $toRestrict
);
}

function meta::external::store::elasticsearch::v7::pureToEs::processDistinct(vs : FunctionExpression[1], initReq: State[1]): State[1]
{
$initReq.debug(|'Processing ->distinct');
let currReq = process($vs.parametersValues->at(0), $initReq);
assertFalse($currReq.aggregationQuery, |'distinct not supported in aggregation queries');
processGroupBy($currReq.tdsESDetails, [], $currReq);
}

function meta::external::store::elasticsearch::v7::pureToEs::processExtend(vs : FunctionExpression[1], initReq: State[1]): State[1]
{
$initReq.debug(|'Processing ->extend');
Expand All @@ -151,7 +173,7 @@ function meta::external::store::elasticsearch::v7::pureToEs::processProject(vs :
{
let currReq = process($vs.parametersValues->at(0), $initReq);
assert(!$currReq.inFilter);
assertFalse($currReq.aggregationQuery, |'project not supported in aggreagtion queries');
assertFalse($currReq.aggregationQuery, |'project not supported in aggregation queries');
let cols = $vs->instanceValuesAtParameter(1, $currReq.sq.inScopeVars);
let fieldsStatePair = $cols->match([
tdsCols: BasicColumnSpecification<TDSRow>[*] | $tdsCols->fold(
Expand Down Expand Up @@ -180,12 +202,22 @@ function meta::external::store::elasticsearch::v7::pureToEs::processProjectColum

$expr->extractSimpleValue($initReq).first.values->match([
tdsDetail: TDSESDetail[1] | pair(^$tdsDetail(name = $name, type = $type), $initReq),
{fe: FunctionExpression[1] |
{vs: ValueSpecification[1] |
let resultPath = ^DocValueResultPath(fieldPath = $name, property = $type->defaultRuntimePropertyForPureType());
let scripted = $fe->toRuntimeMapping($resultPath, $initReq);
pair(^TDSESDetail(type = $type, name = $name, resultPath = $resultPath, expression = $fe, format = $scripted.search.runtime_mappings->toOne()->get($name).format.value), $scripted);
let scripted = $vs->toRuntimeMapping($resultPath, $initReq);
pair(^TDSESDetail(type = $type, name = $name, resultPath = $resultPath, format = $scripted.search.runtime_mappings->toOne()->get($name).format.value), $scripted);
},
any: Any[*] | fail(|'Cannot project column - %s:%s'->format([$name,$any->type()->elementToPath()]))->cast(@Pair<TDSESDetail, State>)
{any: Any[*] |
$any->type()->match([
{pt: PrimitiveType[1] |
let resultPath = ^DocValueResultPath(fieldPath = $name, property = $pt->defaultRuntimePropertyForPureType());
let iv = ^InstanceValue(multiplicity = $vs.func->functionReturnMultiplicity(), genericType = ^GenericType(rawType = $type), values=$any)->evaluateAndDeactivate();
let scripted = $iv->toRuntimeMapping($resultPath, $initReq);
pair(^TDSESDetail(type = $type, name = $name, resultPath = $resultPath, format = $scripted.search.runtime_mappings->toOne()->get($name).format.value), $scripted);
},
other: Any[*] | fail(|'Cannot project column - %s:%s'->format([$name,$any->type()->elementToPath()]))->cast(@Pair<TDSESDetail, State>)
])
}
]);
}

Expand Down Expand Up @@ -273,6 +305,11 @@ function meta::external::store::elasticsearch::v7::pureToEs::processGroupBy(vs:
let groupByTdsESDetails = $groupByCols->map(g | $groupedReq.tdsESDetails->filter(x | $x.name == $g));
assert($groupByCols->size() == $groupByTdsESDetails->size(), | 'grouping by unknown columns: ' + $groupByCols->removeAll($groupByTdsESDetails.name)->joinStrings('[', ', ', ']'));

processGroupBy($groupByTdsESDetails, $aggregateValues, $groupedReq);
}

function meta::external::store::elasticsearch::v7::pureToEs::processGroupBy(groupByTdsESDetails: TDSESDetail[*], aggregateValues: meta::pure::tds::AggregateValue<Any, Any>[*], groupedReq: State[1]): State[1]
{
let aggPairs = $aggregateValues->map({x |
let rawToAggregate = processProjectColumn(^BasicColumnSpecification<TDSRow>(func = $x.mapFn, name = $x.name), $groupedReq).first;
let aggFunc = $x.aggregateFn->deepByPassRouterInfo()->cast(@FunctionDefinition<Any>).expressionSequence->toOne('tds aggregation only supports simple expressions: max, min, sum, etc.');
Expand All @@ -293,7 +330,7 @@ function meta::external::store::elasticsearch::v7::pureToEs::processGroupBy(vs:
});

let aggregations = newMap($aggPairs->map(x | pair($x.first.name, $x.second)));
let aggregationsForSearch = if ($groupByCols->isEmpty(),
let aggregationsForSearch = if ($groupByTdsESDetails->isEmpty(),
|
// no group by field - just run the aggregations
$aggregations
Expand All @@ -316,6 +353,7 @@ function meta::external::store::elasticsearch::v7::pureToEs::processGroupBy(vs:
let newSearch = ^$search(
size = 0->literal() // avoiding reading all the matches, we just want the aggregate results
,aggregations = $aggregationsForSearch
,docvalue_fields = []
,_source = ^SourceConfig(fetch = false->literal())
);

Expand Down Expand Up @@ -558,22 +596,24 @@ function meta::external::store::elasticsearch::v7::pureToEs::supportedRoutingFun
{
let supported = [
pair(supportedIfEqual(indexToTDS_Elasticsearch7Store_1__String_1__TabularDataSet_1_), processSelectAllTds_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::project_TabularDataSet_1__ColumnSpecification_MANY__TabularDataSet_1_), processProject_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::extend_TabularDataSet_1__BasicColumnSpecification_MANY__TabularDataSet_1_), processExtend_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(project_TabularDataSet_1__ColumnSpecification_MANY__TabularDataSet_1_), processProject_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(extend_TabularDataSet_1__BasicColumnSpecification_MANY__TabularDataSet_1_), processExtend_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(restrict_TabularDataSet_1__String_MANY__TabularDataSet_1_), processRestrict_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(distinct_TabularDataSet_1__TabularDataSet_1_), processDistinct_FunctionExpression_1__State_1__State_1_),

pair(supportedIfEqual(meta::pure::tds::filter_TabularDataSet_1__Function_1__TabularDataSet_1_), processFilter_FunctionExpression_1__State_1__State_1_),

pair(supportedIfEqual(meta::pure::tds::sort_TabularDataSet_1__String_MANY__TabularDataSet_1_), processDefaultSort_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::sort_TabularDataSet_1__SortInformation_MANY__TabularDataSet_1_), processSortWithInformation_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::sort_TabularDataSet_1__String_1__SortDirection_1__TabularDataSet_1_), processSortWithDirection_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(filter_TabularDataSet_1__Function_1__TabularDataSet_1_), processFilter_FunctionExpression_1__State_1__State_1_),

pair(supportedIfEqual(meta::pure::tds::groupBy_TabularDataSet_1__String_MANY__AggregateValue_MANY__TabularDataSet_1_), processGroupBy_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(sort_TabularDataSet_1__String_MANY__TabularDataSet_1_), processDefaultSort_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(sort_TabularDataSet_1__SortInformation_MANY__TabularDataSet_1_), processSortWithInformation_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(sort_TabularDataSet_1__String_1__SortDirection_1__TabularDataSet_1_), processSortWithDirection_FunctionExpression_1__State_1__State_1_),

pair(supportedIfEqual(groupBy_TabularDataSet_1__String_MANY__AggregateValue_MANY__TabularDataSet_1_), processGroupBy_FunctionExpression_1__State_1__State_1_),

pair(supportedIfEqual(meta::pure::tds::limit_TabularDataSet_1__Integer_1__TabularDataSet_1_), processLimit_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::take_TabularDataSet_1__Integer_1__TabularDataSet_1_), processLimit_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::drop_TabularDataSet_1__Integer_1__TabularDataSet_1_), processDrop_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(meta::pure::tds::slice_TabularDataSet_1__Integer_1__Integer_1__TabularDataSet_1_), processSlice_FunctionExpression_1__State_1__State_1_)
pair(supportedIfEqual(limit_TabularDataSet_1__Integer_1__TabularDataSet_1_), processLimit_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(take_TabularDataSet_1__Integer_1__TabularDataSet_1_), processLimit_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(drop_TabularDataSet_1__Integer_1__TabularDataSet_1_), processDrop_FunctionExpression_1__State_1__State_1_),
pair(supportedIfEqual(slice_TabularDataSet_1__Integer_1__Integer_1__TabularDataSet_1_), processSlice_FunctionExpression_1__State_1__State_1_)
];
}

Expand Down Expand Up @@ -1228,7 +1268,7 @@ function meta::external::store::elasticsearch::v7::pureToEs::varFreemarkerExpres
));
}

function meta::external::store::elasticsearch::v7::pureToEs::toRuntimeMapping(vs: FunctionExpression[1], field: DocValueResultPath[1], initReq: State[1]): State[1]
function meta::external::store::elasticsearch::v7::pureToEs::toRuntimeMapping(vs: ValueSpecification[1], field: DocValueResultPath[1], initReq: State[1]): State[1]
{
let p = processPainless($vs, ^$initReq(inProject = false, inFilter = true));

Expand All @@ -1247,7 +1287,10 @@ function meta::external::store::elasticsearch::v7::pureToEs::toRuntimeMapping(vs

let emittingScript = ^$script(source = 'emit(%s%s)'->format([$script.source.value->toOne(), $toEpochMillis])->literal());

let format = painlessDateFunctionsFormat($vs.func)->literal();
let format = $vs->match([
fe: FunctionExpression[1] | painlessDateFunctionsFormat($fe.func),
any: Any[*] | []
])->literal();

let newField = pair($field.path(), ^RuntimeField(script = ^Script(inline = $emittingScript), format = $format, type = $type));

Expand All @@ -1267,6 +1310,7 @@ function meta::external::store::elasticsearch::v7::pureToEs::processPainless(vs:
{
$vs->match([
fe: FunctionExpression[1] | $initReq.supportedForPainlessScriptFunctions->findAndEvalSupportedFunction($fe, $initReq, $initReq),
ve: VariableExpression[1] | $ve->processPainlessScalarValue($initReq),
iv: InstanceValue[1] |
$iv.values->match([
f: FunctionDefinition<Any>[1] | $f.expressionSequence->at(0)->processPainless($initReq),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class ElasticsearchCommands

public static Root_meta_pure_functions_io_http_URL startServer(String imageTag)
{
System.setProperty("org.finos.legend.engine.plan.execution.stores.elasticsearch.test.password", UUID.randomUUID().toString());
Root_meta_pure_functions_io_http_URL_Impl url = new Root_meta_pure_functions_io_http_URL_Impl("esUrl");
ElasticsearchContainer container = CONTAINERS.computeIfAbsent(imageTag, ElasticsearchCommands::createContainer);
url._host(container.getHost());
Expand All @@ -64,6 +66,7 @@ public static Root_meta_pure_functions_io_http_URL startServer(String imageTag)

public static void stopServer(String imageTag)
{
System.clearProperty("org.finos.legend.engine.plan.execution.stores.elasticsearch.test.password");
Optional.ofNullable(CONTAINERS.remove(imageTag)).ifPresent(ElasticsearchContainer::stop);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,28 @@ function
meta::external::store::elasticsearch::executionTest::testCase::tds::groupBy::testGroupByOnNullableField(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy('MPAA', agg('count', r | $r.getNullableString('MPAA'), agg | $agg->count())));
}

function
<<paramTest.Test>>
{doc.doc = 'Test distinct on single columns on Elasticsearch (translated as a group by)'}
meta::external::store::elasticsearch::executionTest::testCase::tds::distinct::testDistinctSingleColumn(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->restrict('Director')->distinct());
}

function
<<paramTest.Test>>
{doc.doc = 'Test distinct on multiple columns on Elasticsearch (translated as a group by)'}
meta::external::store::elasticsearch::executionTest::testCase::tds::distinct::testDistinctMultipleColumns(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->restrict(['Director', 'MPAA'])->distinct());
}

function
<<paramTest.Test>>
{doc.doc = 'Test restrict group by columns on Elasticsearch'}
meta::external::store::elasticsearch::executionTest::testCase::tds::restrict::testRestrictGroupByColumns(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->groupBy(['Director', 'MPAA'], [ agg('sumBudget', r | $r.getInteger('Budget'), agg | $agg->sum()), agg('avgBudget', r | $r.getInteger('Budget'), agg | $agg->average()) ])->restrict(['Director', 'avgBudget']));
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ import meta::external::store::elasticsearch::executionTest::testCase::tds::*;
import meta::external::store::elasticsearch::executionTest::test::*;
import meta::external::store::elasticsearch::executionTest::utils::*;

// function
// <<paramTest.Test>>
// {doc.doc = 'Test projection on Elasticsearch with pure constant expressions'}
// meta::external::store::elasticsearch::executionTest::testCase::tds::project::misc::testProjectExpressionConstant(config:TestConfig[1]):Boolean[1]
// {
// $config->testTdsExpression(x|$x->project([col(x: TDSRow[1] | 'Hello World!', 'Bucket')]));
// }
function
<<paramTest.Test>>
{doc.doc = 'Test projection on Elasticsearch with pure constant expressions'}
meta::external::store::elasticsearch::executionTest::testCase::tds::project::misc::testProjectExpressionConstant(config:TestConfig[1]):Boolean[1]
{
$config->testTdsExpression(x|$x->project([col(x: TDSRow[1] | 'Hello World!', 'Bucket')]));
}

// function
// <<paramTest.Test>>
// {doc.doc = 'Test projection on Elasticsearch with pure constant expressions'}
// meta::external::store::elasticsearch::executionTest::testCase::tds::project::misc::testProjectExpressionConstantWithVariable(config:TestConfig[1]):Boolean[1]
// {
// let val = 12345;
// $config->testTdsExpression(x|$x->project([col(x: TDSRow[1] | $val, 'Bucket')]));
// }
function
<<paramTest.Test>>
{doc.doc = 'Test projection on Elasticsearch with pure constant expressions'}
meta::external::store::elasticsearch::executionTest::testCase::tds::project::misc::testProjectExpressionConstantWithVariable(config:TestConfig[1]):Boolean[1]
{
let val = 12345;
$config->testTdsExpression(x|$x->project([col(x: TDSRow[1] | $val, 'Bucket')]));
}

function
<<paramTest.Test>>
Expand Down
Loading

0 comments on commit a4c74f8

Please sign in to comment.