Skip to content

Commit

Permalink
Process publications in chunks including Elasticsearch indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
sarkikos committed Jan 12, 2024
1 parent ed89924 commit 041fc8c
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 20 deletions.
31 changes: 28 additions & 3 deletions aspnetcore/src/ElasticService/ElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,35 @@ await _elasticClient.Indices.BulkAliasAsync(r => r
await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));

_logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName);
}

public async Task IndexChunkAsync(string indexToCreate, List<object> entities, Type modelType)
{
// Add entities to the index.
await IndexEntities(indexToCreate, entities, modelType);
}

public async Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete)
{
// Wait for new index to be operational.
await _elasticClient.Cluster
.HealthAsync(selector: s => s
.WaitForStatus(Elasticsearch.Net.WaitForStatus.Yellow)
.WaitForActiveShards("1")
.Index(indexToCreate));

// Add new alias from index_new to index
await _elasticClient.Indices.BulkAliasAsync(r => r
// Remove alias "index_old => index"
.Remove(remove => remove.Alias(indexName).Index("*"))
// Add alias "index_new => index"
.Add(add => add.Alias(indexName).Index(indexToCreate)));

// Delete the old index if it exists.
await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));
}

private async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName)
public async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName)
{
var indexNameV1 = $"{indexName}_v1";
var indexNameV2 = $"{indexName}_v2";
Expand Down Expand Up @@ -99,7 +124,7 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod
throw new InvalidOperationException($"Indexing documents to {indexName} failed.", indexBatchResponse.OriginalException);
}
indexedCount = indexedCount + batchToIndex.Count;
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}. Progress {IndexedCount}/{TotalCount}", modelType.Name, batchToIndex.Count, indexName, indexedCount, entities.Count);
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}", modelType.Name, batchToIndex.Count, indexName);
}
}

Expand All @@ -111,7 +136,7 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod
/// <param name="type"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
private async Task CreateIndex(string indexName, Type type)
public async Task CreateIndex(string indexName, Type type)
{
await _elasticClient.Indices.DeleteAsync(indexName,
d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));
Expand Down
38 changes: 37 additions & 1 deletion aspnetcore/src/ElasticService/IElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,48 @@
public interface IElasticSearchIndexService
{
/// <summary>
/// Creates a new index with the given name and indexses the given entities.
/// Creates a new index with the given name and indexes the given entities.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="indexName"></param>
/// <param name="entities"></param>
/// <param name="modelType"></param>
/// <returns></returns>
Task IndexAsync(string indexName, List<object> entities, Type modelType);

/// <summary>
/// Index given entities.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="indexName"></param>
/// <param name="entities"></param>
/// <param name="modelType"></param>
/// <returns></returns>
Task IndexChunkAsync(string indexName, List<object> entities, Type modelType);

/// <summary>
/// Get name of index to create and name of index to delete.
/// </summary>
/// <param name="indexName"></param>
/// <returns></returns>
Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName);

/// <summary>
/// Creates index with the given name.
/// If the index exists already, it will be deleted first.
/// </summary>
/// <param name="indexName"></param>
/// <param name="type"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
Task CreateIndex(string indexName, Type type);

/// <summary>
/// Switch index
/// </summary>
/// <param name="indexName"></param>
/// <param name="indexToCreate"></param>
/// <param name="indexToDelete"></param>
/// <returns></returns>
Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete);
}
42 changes: 26 additions & 16 deletions aspnetcore/src/Indexer/Indexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ private async Task IndexEntities(string indexName,
List<object> finalized = new();

if (indexName.Contains("publication")) {

var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName);
await _indexService.CreateIndex(indexToCreate, type);

/*
* Process database result in smaller chunks to keep memory requirement low.
* Chunking is based on skip/take query.
Expand All @@ -143,8 +147,9 @@ private async Task IndexEntities(string indexName,
*/

int skipAmount = 0;
int takeAmount = 200000;
int takeAmount = 2;
int numOfResults = 0;
int processedCount = 0;

do
{
Expand All @@ -155,14 +160,18 @@ private async Task IndexEntities(string indexName,

if (numOfResults > 0)
{
_logger.LogInformation("{EntityType}: In-memory operations start", type.Name);
foreach (object entity in indexModels) {
finalized.Add(repository.PerformInMemoryOperation(entity));
}
_logger.LogInformation("{EntityType}: In-memory operations complete", type.Name);
await _indexService.IndexChunkAsync(indexToCreate, finalized, type);
}
skipAmount = skipAmount + takeAmount;
finalized = new();
processedCount = processedCount + numOfResults;
_logger.LogInformation("{EntityType}: Total indexed count = {processedCount}", type.Name, processedCount);
} while(numOfResults >= takeAmount-1);

await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete);
}
else
{
Expand All @@ -180,20 +189,21 @@ private async Task IndexEntities(string indexName,
_logger.LogInformation("{EntityType}: Start in-memory operations", type.Name);
finalized = repository.PerformInMemoryOperations(indexModels);
}
}
var inMemoryElapsed = stopWatch.Elapsed;

if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
var inMemoryElapsed = stopWatch.Elapsed;

if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
}
}
}
catch (Exception ex)
Expand Down

0 comments on commit 041fc8c

Please sign in to comment.