Skip to content

Commit

Permalink
Merge pull request #97 from kununu/composite-aggregations
Browse files Browse the repository at this point in the history
Add Composite Aggregations Builder and Repository
  • Loading branch information
diogocorreia-kununu authored Jul 16, 2024
2 parents 87b8f2e + 4597e80 commit 87e63e0
Show file tree
Hide file tree
Showing 18 changed files with 718 additions and 0 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"ext-json": "*",
"ext-mbstring": "*",
"elasticsearch/elasticsearch": "^7.0",
"kununu/collections": "^4.1.0",
"psr/log": "^1.0|^2.0|^3.0"
},
"require-dev": {
Expand Down
10 changes: 10 additions & 0 deletions src/Exception/MissingAggregationAttributesException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Exception;

use RuntimeException;

final class MissingAggregationAttributesException extends RuntimeException
{
}
105 changes: 105 additions & 0 deletions src/Query/Aggregation/Builder/CompositeAggregationQueryBuilder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Query\Aggregation\Builder;

use Kununu\Elasticsearch\Exception\MissingAggregationAttributesException;
use Kununu\Elasticsearch\Query\Aggregation\SourceProperty;
use Kununu\Elasticsearch\Query\Aggregation\Sources;
use Kununu\Elasticsearch\Query\CompositeAggregationQueryInterface;
use Kununu\Elasticsearch\Query\Criteria\Filter;
use Kununu\Elasticsearch\Query\Criteria\Filters;
use Kununu\Elasticsearch\Query\QueryInterface;
use Kununu\Elasticsearch\Query\RawQuery;
use Kununu\Elasticsearch\Util\ArrayUtilities;

final class CompositeAggregationQueryBuilder implements CompositeAggregationQueryInterface
{
private ?array $afterKey = null;
private Filters $filters;
private ?string $name = null;
private ?Sources $sources = null;

private function __construct()
{
$this->filters = new Filters();
}

public static function create(): self
{
return new self();
}

public function withAfterKey(?array $afterKey): self
{
$this->afterKey = $afterKey;

return $this;
}

public function withFilters(Filters $filters): self
{
$this->filters = $filters;

return $this;
}

public function withName(string $name): self
{
$this->name = $name;

return $this;
}

public function withSources(Sources $sources): self
{
$this->sources = $sources;

return $this;
}

public function getName(): string
{
if (null === $this->name) {
throw new MissingAggregationAttributesException('Aggregation name is missing');
}

return $this->name;
}

public function getQuery(int $compositeSize = 100): QueryInterface
{
return RawQuery::create(
ArrayUtilities::filterNullAndEmptyValues([
'query' => [
'bool' => [
'must' => $this->filters->map(fn(Filter $filter) => $filter->toArray()),
],
],
'aggs' => [
$this->getName() => [
'composite' => [
'size' => $compositeSize,
'sources' => $this->sources?->map(
fn(SourceProperty $sourceProperty) => [
$sourceProperty->source => [
'terms' => [
'field' => $sourceProperty->property,
'missing_bucket' => $sourceProperty->missingBucket,
]
],
]
) ?? [],
'after' => $this->afterKey,
],
],
],
], true)
);
}

public function toArray(): array
{
return $this->getQuery()->toArray();
}
}
15 changes: 15 additions & 0 deletions src/Query/Aggregation/SourceProperty.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Query\Aggregation;

final class SourceProperty
{
public function __construct(
public readonly string $source,
public readonly string $property,
public readonly bool $missingBucket = false
)
{
}
}
37 changes: 37 additions & 0 deletions src/Query/Aggregation/Sources.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Query\Aggregation;

use InvalidArgumentException;
use Kununu\Collection\AbstractCollection;

final class Sources extends AbstractCollection
{
private const INVALID = 'Can only append %s';

public function __construct(SourceProperty ...$sourceProperties)
{
parent::__construct();

foreach ($sourceProperties as $sourceProperty) {
$this->append($sourceProperty);
}
}

public function current(): ?SourceProperty
{
$current = parent::current();
assert($this->count() > 0 ? $current instanceof SourceProperty : null === $current);

return $current;
}

public function append($value): void
{
match (true) {
$value instanceof SourceProperty => parent::append($value),
default => throw new InvalidArgumentException(sprintf(self::INVALID, SourceProperty::class))
};
}
}
17 changes: 17 additions & 0 deletions src/Query/CompositeAggregationQueryInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Query;

interface CompositeAggregationQueryInterface
{
public function getQuery(int $compositeSize = 100): QueryInterface;

public function getName(): string;

public function withName(string $name): self;

public function withAfterKey(?array $afterKey): self;

public function toArray(): array;
}
37 changes: 37 additions & 0 deletions src/Query/Criteria/Filters.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Query\Criteria;

use InvalidArgumentException;
use Kununu\Collection\AbstractCollection;

class Filters extends AbstractCollection
{
private const INVALID = 'Can only append %s';

public function __construct(Filter ...$propertyFilters)
{
parent::__construct();

foreach ($propertyFilters as $propertyFilter) {
$this->append($propertyFilter);
}
}

public function current(): ?Filter
{
$current = parent::current();
assert($this->count() > 0 ? $current instanceof Filter : null === $current);

return $current;
}

public function append($value): void
{
match (true) {
$value instanceof Filter => parent::append($value),
default => throw new InvalidArgumentException(sprintf(self::INVALID, Filter::class))
};
}
}
26 changes: 26 additions & 0 deletions src/Repository/Repository.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use Elasticsearch\Client;
use Elasticsearch\Common\Exceptions\Missing404Exception;
use Generator;
use Kununu\Elasticsearch\Exception\BulkException;
use Kununu\Elasticsearch\Exception\DeleteException;
use Kununu\Elasticsearch\Exception\DocumentNotFoundException;
Expand All @@ -14,10 +15,12 @@
use Kununu\Elasticsearch\Exception\UpdateException;
use Kununu\Elasticsearch\Exception\UpsertException;
use Kununu\Elasticsearch\Exception\WriteOperationException;
use Kununu\Elasticsearch\Query\CompositeAggregationQueryInterface;
use Kununu\Elasticsearch\Query\Query;
use Kununu\Elasticsearch\Query\QueryInterface;
use Kununu\Elasticsearch\Result\AggregationResultSet;
use Kununu\Elasticsearch\Result\AggregationResultSetInterface;
use Kununu\Elasticsearch\Result\CompositeResult;
use Kununu\Elasticsearch\Result\ResultIterator;
use Kununu\Elasticsearch\Result\ResultIteratorInterface;
use Kununu\Elasticsearch\Util\LoggerAwareTrait;
Expand Down Expand Up @@ -283,6 +286,29 @@ function() use ($query) {
);
}

public function aggregateCompositeByQuery(CompositeAggregationQueryInterface $query): Generator
{
$afterKey = null;

do {
$result = $this->aggregateByQuery(
$query->withAfterKey($afterKey)->getQuery()
)->getResultByName($query->getName());

foreach ($result?->getFields()['buckets'] ?? [] as $bucket) {
if (!empty($bucket['key']) && !empty($bucket['doc_count'])) {
yield new CompositeResult(
$bucket['key'],
$bucket['doc_count'],
$query->getName()
);
}
}

$afterKey = $result?->get('after_key') ?? null;
} while (null !== $afterKey);
}

public function updateByQuery(QueryInterface $query, array $updateScript): array
{
return $this->executeWrite(
Expand Down
9 changes: 9 additions & 0 deletions src/Repository/RepositoryInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

namespace Kununu\Elasticsearch\Repository;

use Generator;
use Kununu\Elasticsearch\Query\CompositeAggregationQueryInterface;
use Kununu\Elasticsearch\Query\QueryInterface;
use Kununu\Elasticsearch\Result\AggregationResultSetInterface;
use Kununu\Elasticsearch\Result\ResultIteratorInterface;
Expand Down Expand Up @@ -102,6 +104,13 @@ public function countByQuery(QueryInterface $query): int;
*/
public function aggregateByQuery(QueryInterface $query): AggregationResultSetInterface;

/**
* This method executes a query with composite aggregation, iterates through the results, and retrieves the data.
*
* @return Generator <CompositeResult>
*/
public function aggregateCompositeByQuery(CompositeAggregationQueryInterface $query): Generator;

/**
* This method updates all documents matching a given $query using a given $updateScript.
*/
Expand Down
15 changes: 15 additions & 0 deletions src/Result/CompositeResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Result;

final class CompositeResult
{
public function __construct(
public readonly array $results,
public readonly int $documentsCount,
public readonly string $aggregationName
)
{
}
}
20 changes: 20 additions & 0 deletions src/Util/ArrayUtilities.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php
declare(strict_types=1);

namespace Kununu\Elasticsearch\Util;

final class ArrayUtilities
{
public static function filterNullAndEmptyValues(array $values, bool $recursive = false): array
{
if ($recursive) {
foreach ($values as &$value) {
if (is_array($value)) {
$value = self::filterNullAndEmptyValues($value, true);
}
}
}

return array_filter($values, fn ($value) => $value !== null && $value !== []);
}
}
Loading

0 comments on commit 87e63e0

Please sign in to comment.