Skip to content

Commit

Permalink
Enable chunked database queries in indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
sarkikos committed Oct 6, 2023
1 parent 25970d9 commit a10bbba
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 33 deletions.
13 changes: 7 additions & 6 deletions aspnetcore/src/ElasticService/ElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class ElasticSearchIndexService : IElasticSearchIndexService
private readonly IElasticClient _elasticClient;
private readonly ILogger<ElasticSearchIndexService> _logger;

private const int BatchSize = 1000;
private const int BatchSize = 2000;

public ElasticSearchIndexService(IElasticClient elasticClient, ILogger<ElasticSearchIndexService> logger)
{
Expand Down Expand Up @@ -78,6 +78,8 @@ await _elasticClient.Indices.BulkAliasAsync(r => r

private async Task IndexEntities<T>(string indexName, List<T> entities, Type modelType) where T : class
{
var indexedCount = 0;

// Split entities into batches to avoid one big request.
var documentBatches = new List<List<T>>();
for (var docIndex = 0; docIndex < entities.Count; docIndex += BatchSize)
Expand All @@ -93,11 +95,11 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod

if (!indexBatchResponse.IsValid)
{
_logger.LogError(indexBatchResponse.OriginalException, "{EntityType}: Indexing entities to {IndexName} failed", modelType, indexName);
throw new InvalidOperationException($"Indexing entities to {indexName} failed.", indexBatchResponse.OriginalException);
_logger.LogError(indexBatchResponse.OriginalException, "{EntityType}: Indexing documents to {IndexName} failed", modelType, indexName);
throw new InvalidOperationException($"Indexing documents to {indexName} failed.", indexBatchResponse.OriginalException);
}

_logger.LogDebug("{EntityType}: Indexed {BatchSize} documents to {IndexName}", modelType.Name, batchToIndex.Count, indexName);
indexedCount = indexedCount + batchToIndex.Count;
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}. Progress {IndexedCount}/{TotalCount}", modelType.Name, batchToIndex.Count, indexName, indexedCount, entities.Count);
}
}

Expand Down Expand Up @@ -125,5 +127,4 @@ await _elasticClient.Indices.DeleteAsync(indexName,

_logger.LogDebug("{EntityType}: Index {IndexName} created", type.Name, indexName);
}

}
97 changes: 72 additions & 25 deletions aspnetcore/src/Indexer/Indexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,22 @@ public async Task Start()
{
Stopwatch stopWatch = new();
stopWatch.Start();

await PopulateOrganizationCache();
await PopulateFundingCallCache();

_logger.LogInformation("Starting indexing...");
_logger.LogInformation("Using ElasticSearch at '{ElasticSearchAddress}'", _configuration["ELASTICSEARCH:URL"]);

var configuredTypesAndIndexNames = _indexNameSettings.GetTypesAndIndexNames();

await PopulateOrganizationCache();
await PopulateFundingCallCache();

foreach (var (indexName, modelType) in configuredTypesAndIndexNames)
{
var repositoryForType = _indexRepositories.SingleOrDefault(repo => repo.ModelType == modelType);

if (repositoryForType is null)
{
_logger.LogError("{EntityType}: Unable to find database repository for index {IndexName}", modelType.Name, indexName);

_logger.LogError("{EntityType}: Unable to find database repository for index {IndexName}", modelType.Name, indexName);
continue;
}

Expand All @@ -73,7 +72,7 @@ public async Task Start()
/// </summary>
private async Task PopulateOrganizationCache()
{
_logger.LogInformation("Populating Organization cache...");
_logger.LogInformation("Populating Organization cache");

var organizationRepository = _indexRepositories.SingleOrDefault(repo => repo.ModelType == typeof(Organization));
if (organizationRepository != null)
Expand All @@ -84,14 +83,16 @@ private async Task PopulateOrganizationCache()
_memoryCache.Set(MemoryCacheKeys.OrganizationById(organization.Id), organization);
}
}

_logger.LogInformation("Populated Organization cache");
}

/// <summary>
/// Gets all call programmes from the database to an in-memory cache to simplify SQL queries by automapper.
/// </summary>
private async Task PopulateFundingCallCache()
{
_logger.LogInformation("Populating Funding Call cache...");
_logger.LogInformation("Populating Funding Call cache");

var fundingCallRepository = _indexRepositories.SingleOrDefault(repo => repo.ModelType == typeof(FundingCall));
if (fundingCallRepository != null)
Expand All @@ -116,38 +117,84 @@ private async Task PopulateFundingCallCache()
}
}
}

_logger.LogInformation("Populated Funding Call cache");
}

private async Task IndexEntities(string indexName,
IIndexRepository repository,
Type type)
{
_logger.LogInformation("{EntityType}: Recreating '{IndexName}' index...", type.Name, indexName);

Stopwatch stopWatch = new();
stopWatch.Start();

try
{
_logger.LogDebug("{EntityType}: Recreating '{IndexName}' index...", type.Name, indexName);

var indexModels = await repository.GetAllAsync().ToListAsync();

var databaseElapsed = stopWatch.Elapsed;

_logger.LogDebug("{EntityType}: Retrieved {DatabaseCount} entities from the database in {ElapsedDatabase}...", type.Name, indexModels.Count, databaseElapsed);
List<object> finalized = new();

if (indexName.Contains("publication")) {
/*
* Process database result in smaller chunks to keep memory requirement low.
* Chunking is based on skip/take query.
* Currently this is done only for publications, because their dataset is much
* larger than others.
*/

int skipAmount = 0;
int takeAmount = 200000;
int numOfResults = 0;

var finalized = repository.PerformInMemoryOperations(indexModels);

do
{
_logger.LogInformation("{EntityType}: Requested {takeAmount} entities from database...", type.Name, takeAmount);
var indexModels = await repository.GetChunkAsync(skipAmount: skipAmount, takeAmount: takeAmount).ToListAsync();
numOfResults = indexModels.Count;
_logger.LogInformation("{EntityType}: ...received {numOfResults} entities", type.Name, numOfResults);

if (numOfResults > 0)
{
_logger.LogInformation("{EntityType}: In-memory operations start", type.Name);
foreach (object entity in indexModels) {
finalized.Add(repository.PerformInMemoryOperation(entity));
}
_logger.LogInformation("{EntityType}: In-memory operations complete", type.Name);
}
skipAmount = skipAmount + takeAmount;
} while(numOfResults >= takeAmount-1);
}
else
{
/*
* Process complete database result at once.
* Suitable for small result sets.
*/
_logger.LogInformation("{EntityType}: Requested all entities from database...", type.Name);
var indexModels = await repository.GetAllAsync().ToListAsync();
var databaseElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: ..received {DatabaseCount} entities in {ElapsedDatabase}", type.Name, indexModels.Count, databaseElapsed);

if (indexModels.Count > 0)
{
_logger.LogInformation("{EntityType}: Start in-memory operations", type.Name);
finalized = repository.PerformInMemoryOperations(indexModels);
}
}
var inMemoryElapsed = stopWatch.Elapsed;

_logger.LogDebug("{EntityType}: Performed in-memory operations in {ElapsedInMemory}...", type.Name, inMemoryElapsed - databaseElapsed);

await _indexService.IndexAsync(indexName, finalized, type);

var indexingElapsed = stopWatch.Elapsed;

_logger.LogDebug("{EntityType}: Indexed {IndexCount} entities in {ElapsedIndexing}...", type.Name, indexModels.Count, indexingElapsed - inMemoryElapsed);

_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully with {IndexCount} entities in {ElapsedTotal}", type.Name, indexName, indexModels.Count, stopWatch.Elapsed);
if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
}
}
catch (Exception ex)
{
Expand Down
7 changes: 6 additions & 1 deletion aspnetcore/src/Indexer/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
}
},
"WriteTo": {
"ConsoleSink": "Console",
"ConsoleSink": {
"Name": "Console",
"Args": {
"outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}"
}
},
"HttpSink": {
"Name": "Http",
"Args": {
Expand Down
7 changes: 6 additions & 1 deletion aspnetcore/src/Indexer/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
}
},
"WriteTo": {
"ConsoleSink": "Console",
"ConsoleSink": {
"Name": "Console",
"Args": {
"outputTemplate": "[{Timestamp:yyyy-MM-dd HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}"
}
},
"HttpSink": {
"Name": "Http",
"Args": {
Expand Down
12 changes: 12 additions & 0 deletions aspnetcore/src/Repositories/FundingCallIndexRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ protected override IQueryable<FundingCall> GetAll()
.Where(callProgramme => callProgramme.Id != -1)
.ProjectTo<FundingCall>(_mapper.ConfigurationProvider);
}

protected override IQueryable<FundingCall> GetChunk(int skipAmount, int takeAmount)
{
return _context.DimCallProgrammes
.OrderBy(callProgramme => callProgramme.Id)
.Skip(skipAmount)
.Take(takeAmount)
.AsNoTracking()
.AsSplitQuery()
.Where(callProgramme => callProgramme.Id != -1)
.ProjectTo<FundingCall>(_mapper.ConfigurationProvider);
}

public override List<object> PerformInMemoryOperations(List<object> entities)
{
Expand Down
18 changes: 18 additions & 0 deletions aspnetcore/src/Repositories/FundingDecisionIndexRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ protected override IQueryable<FundingDecision> GetAll()
.ProjectTo<FundingDecision>(_mapper.ConfigurationProvider);
}

protected override IQueryable<FundingDecision> GetChunk(int skipAmount, int takeAmount)
{
return _context
.Set<DimFundingDecision>()
.OrderBy(fd => fd.Id)
.Skip(skipAmount)
.Take(takeAmount)
.AsNoTracking()
.AsSplitQuery()
.Where(fd => fd.Id != -1)
.Where(fd =>
(fd.SourceDescription == "eu_funding"
&& fd.BrFundingConsortiumParticipations.Any(fc => fc.DimOrganization.DimPids.Any(pid => pid.PidType == "PIC")))
||
fd.SourceDescription != "eu_funding")
.ProjectTo<FundingDecision>(_mapper.ConfigurationProvider);
}

public override List<object> PerformInMemoryOperations(List<object> objects)
{
objects.ForEach(o =>
Expand Down
19 changes: 19 additions & 0 deletions aspnetcore/src/Repositories/IIndexRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ public interface IIndexRepository<out T> : IIndexRepository where T : class
/// </summary>
/// <returns></returns>
IQueryable<T> GetAll();

/// <summary>
/// Gets a chunk of database entities for indexing, projected as T.
/// </summary>
/// <returns></returns>
IQueryable<T> GetChunk(int skipAmount, int takeAmount);
}

/// <summary>
Expand All @@ -30,10 +36,23 @@ public interface IIndexRepository
/// <returns></returns>
IAsyncEnumerable<object> GetAllAsync();

/// <summary>
/// Returns models of the type ModelType.
/// </summary>
/// <returns></returns>
IAsyncEnumerable<object> GetChunkAsync(int skipAmount, int takeAmount);

/// <summary>
/// Perform data manipulations which are hard to do in the db query phase.
/// </summary>
/// <param name="objects"></param>
/// <returns></returns>
List<object> PerformInMemoryOperations(List<object> objects);

/// <summary>
/// Perform to single entity data manipulations which are hard to do in the db query phase.
/// </summary>
/// <param name="objects"></param>
/// <returns></returns>
object PerformInMemoryOperation(object entity);
}
22 changes: 22 additions & 0 deletions aspnetcore/src/Repositories/IndexRepositoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@ protected IndexRepositoryBase(IMemoryCache memoryCache)
/// </summary>
/// <returns></returns>
protected abstract IQueryable<T> GetAll();

/// <summary>
/// Method which every IndexRepository must override, provides every T as Queryable.
/// </summary>
/// <returns></returns>
protected abstract IQueryable<T> GetChunk(int skipAmount, int takeAmount);

Type IIndexRepository.ModelType => typeof(T);

IQueryable<T> IIndexRepository<T>.GetAll() => GetAll();

IQueryable<T> IIndexRepository<T>.GetChunk(int skipAmount, int takeAmount) => GetChunk(skipAmount, takeAmount);

IAsyncEnumerable<object> IIndexRepository.GetAllAsync() => GetAll().AsAsyncEnumerable();

IAsyncEnumerable<object> IIndexRepository.GetChunkAsync(int skipAmount, int takeAmount) => GetChunk(skipAmount, takeAmount).AsAsyncEnumerable();

/// <summary>
/// If the data type needs special data manipulations after data has been fetched,
/// this can be overriden in derived classes.
Expand All @@ -38,6 +48,18 @@ public virtual List<object> PerformInMemoryOperations(List<object> entities)
{
return entities;
}

/// <summary>
/// If the data type needs special data manipulations after data has been fetched,
/// this can be overriden in derived classes.
/// Single entity version.
/// </summary>
/// <param name="entity"></param>
/// <returns></returns>
public virtual object PerformInMemoryOperation(object entity)
{
return entity;
}

protected Service.Models.Organization.Organization? GetOrganization(int organizationId)
{
Expand Down
10 changes: 10 additions & 0 deletions aspnetcore/src/Repositories/InfrastructureIndexRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,14 @@ protected override IQueryable<Infrastructure> GetAll()
.Where(infrastructure => infrastructure.Id != -1)
.ProjectTo<Infrastructure>(_mapper.ConfigurationProvider);
}

protected override IQueryable<Infrastructure> GetChunk(int skipAmount, int takeAmount)
{
return _context.DimInfrastructures
.OrderBy(infrastructure => infrastructure.Id)
.Skip(skipAmount)
.Take(takeAmount)
.Where(infrastructure => infrastructure.Id != -1)
.ProjectTo<Infrastructure>(_mapper.ConfigurationProvider);
}
}
10 changes: 10 additions & 0 deletions aspnetcore/src/Repositories/OrganizationIndexRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,14 @@ protected override IQueryable<Organization> GetAll()
.Where(organization => organization.Id != -1)
.ProjectTo<Organization>(_mapper.ConfigurationProvider);
}

protected override IQueryable<Organization> GetChunk(int skipAmount, int takeAmount)
{
return _context.DimOrganizations
.OrderBy(organization => organization.Id)
.Skip(skipAmount)
.Take(takeAmount)
.Where(organization => organization.Id != -1)
.ProjectTo<Organization>(_mapper.ConfigurationProvider);
}
}
Loading

0 comments on commit a10bbba

Please sign in to comment.