Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix CSCTTV-3694 indexer out of memory #112

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions aspnetcore/src/ElasticService/ElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public async Task IndexAsync(string indexName, List<object> entities, Type model
// Add entities to the new index.
await IndexEntities(indexToCreate, entities, modelType);

// Switch indexes
await SwitchIndexes(indexName, indexToCreate, indexToDelete);

_logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName);
}

public async Task IndexChunkAsync(string indexToCreate, List<object> entities, Type modelType)
{
// Add entities to the index.
await IndexEntities(indexToCreate, entities, modelType);
}

public async Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete)
{
// Wait for new index to be operational.
await _elasticClient.Cluster
.HealthAsync(selector: s => s
Expand All @@ -47,12 +61,9 @@ await _elasticClient.Indices.BulkAliasAsync(r => r

// Delete the old index if it exists.
await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));

_logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName);

}

private async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName)
public async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName)
{
var indexNameV1 = $"{indexName}_v1";
var indexNameV2 = $"{indexName}_v2";
Expand Down Expand Up @@ -99,7 +110,7 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod
throw new InvalidOperationException($"Indexing documents to {indexName} failed.", indexBatchResponse.OriginalException);
}
indexedCount = indexedCount + batchToIndex.Count;
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}. Progress {IndexedCount}/{TotalCount}", modelType.Name, batchToIndex.Count, indexName, indexedCount, entities.Count);
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}", modelType.Name, batchToIndex.Count, indexName);
}
}

Expand All @@ -111,7 +122,7 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod
/// <param name="type"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
private async Task CreateIndex(string indexName, Type type)
public async Task CreateIndex(string indexName, Type type)
{
await _elasticClient.Indices.DeleteAsync(indexName,
d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));
Expand Down
38 changes: 37 additions & 1 deletion aspnetcore/src/ElasticService/IElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,48 @@
public interface IElasticSearchIndexService
{
/// <summary>
/// Creates a new index with the given name and indexses the given entities.
/// Creates a new index with the given name and indexes the given entities.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="indexName"></param>
/// <param name="entities"></param>
/// <param name="modelType"></param>
/// <returns></returns>
Task IndexAsync(string indexName, List<object> entities, Type modelType);

/// <summary>
/// Index given entities.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="indexName"></param>
/// <param name="entities"></param>
/// <param name="modelType"></param>
/// <returns></returns>
Task IndexChunkAsync(string indexName, List<object> entities, Type modelType);

/// <summary>
/// Get name of index to create and name of index to delete.
/// </summary>
/// <param name="indexName"></param>
/// <returns></returns>
Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName);

/// <summary>
/// Creates index with the given name.
/// If the index exists already, it will be deleted first.
/// </summary>
/// <param name="indexName"></param>
/// <param name="type"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
Task CreateIndex(string indexName, Type type);

/// <summary>
/// Switch index
/// </summary>
/// <param name="indexName"></param>
/// <param name="indexToCreate"></param>
/// <param name="indexToDelete"></param>
/// <returns></returns>
Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete);
}
44 changes: 28 additions & 16 deletions aspnetcore/src/Indexer/Indexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ private async Task IndexEntities(string indexName,
List<object> finalized = new();

if (indexName.Contains("publication")) {

// Create new index
var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName);
await _indexService.CreateIndex(indexToCreate, type);

/*
* Process database result in smaller chunks to keep memory requirement low.
* Chunking is based on skip/take query.
Expand All @@ -143,8 +148,9 @@ private async Task IndexEntities(string indexName,
*/

int skipAmount = 0;
int takeAmount = 200000;
int takeAmount = 50000;
int numOfResults = 0;
int processedCount = 0;

do
{
Expand All @@ -155,14 +161,19 @@ private async Task IndexEntities(string indexName,

if (numOfResults > 0)
{
_logger.LogInformation("{EntityType}: In-memory operations start", type.Name);
foreach (object entity in indexModels) {
finalized.Add(repository.PerformInMemoryOperation(entity));
}
_logger.LogInformation("{EntityType}: In-memory operations complete", type.Name);
await _indexService.IndexChunkAsync(indexToCreate, finalized, type);
}
skipAmount = skipAmount + takeAmount;
processedCount = processedCount + numOfResults;
finalized = new();
_logger.LogInformation("{EntityType}: Total documents indexed = {processedCount}", type.Name, processedCount);
} while(numOfResults >= takeAmount-1);

// Activate new index and delete old
await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete);
}
else
{
Expand All @@ -180,20 +191,21 @@ private async Task IndexEntities(string indexName,
_logger.LogInformation("{EntityType}: Start in-memory operations", type.Name);
finalized = repository.PerformInMemoryOperations(indexModels);
}
}
var inMemoryElapsed = stopWatch.Elapsed;

if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
var inMemoryElapsed = stopWatch.Elapsed;

if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
}
}
}
catch (Exception ex)
Expand Down
Loading