From 041fc8c2228e779d07f51ac6b396b808d122e54d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Fri, 12 Jan 2024 12:38:25 +0200 Subject: [PATCH 1/3] Process publications in chunks including Elasticsearch indexing --- .../ElasticSearchIndexService.cs | 31 ++++++++++++-- .../IElasticSearchIndexService.cs | 38 ++++++++++++++++- aspnetcore/src/Indexer/Indexer.cs | 42 ++++++++++++------- 3 files changed, 91 insertions(+), 20 deletions(-) diff --git a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs index 0ad3f7c..38f68c5 100644 --- a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs +++ b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs @@ -49,10 +49,35 @@ await _elasticClient.Indices.BulkAliasAsync(r => r await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); _logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName); + } + + public async Task IndexChunkAsync(string indexToCreate, List entities, Type modelType) + { + // Add entities to the index. + await IndexEntities(indexToCreate, entities, modelType); + } + public async Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete) + { + // Wait for new index to be operational. + await _elasticClient.Cluster + .HealthAsync(selector: s => s + .WaitForStatus(Elasticsearch.Net.WaitForStatus.Yellow) + .WaitForActiveShards("1") + .Index(indexToCreate)); + + // Add new alias from index_new to index + await _elasticClient.Indices.BulkAliasAsync(r => r + // Remove alias "index_old => index" + .Remove(remove => remove.Alias(indexName).Index("*")) + // Add alias "index_new => index" + .Add(add => add.Alias(indexName).Index(indexToCreate))); + + // Delete the old index if it exists. + await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); } - private async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName) + public async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName) { var indexNameV1 = $"{indexName}_v1"; var indexNameV2 = $"{indexName}_v2"; @@ -99,7 +124,7 @@ private async Task IndexEntities(string indexName, List entities, Type mod throw new InvalidOperationException($"Indexing documents to {indexName} failed.", indexBatchResponse.OriginalException); } indexedCount = indexedCount + batchToIndex.Count; - _logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}. Progress {IndexedCount}/{TotalCount}", modelType.Name, batchToIndex.Count, indexName, indexedCount, entities.Count); + _logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}", modelType.Name, batchToIndex.Count, indexName); } } @@ -111,7 +136,7 @@ private async Task IndexEntities(string indexName, List entities, Type mod /// /// /// - private async Task CreateIndex(string indexName, Type type) + public async Task CreateIndex(string indexName, Type type) { await _elasticClient.Indices.DeleteAsync(indexName, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); diff --git a/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs b/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs index e2241fb..960b848 100644 --- a/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs +++ b/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs @@ -6,7 +6,7 @@ public interface IElasticSearchIndexService { /// - /// Creates a new index with the given name and indexses the given entities. + /// Creates a new index with the given name and indexes the given entities. /// /// /// @@ -14,4 +14,40 @@ public interface IElasticSearchIndexService /// /// Task IndexAsync(string indexName, List entities, Type modelType); + + /// + /// Index given entities. + /// + /// + /// + /// + /// + /// + Task IndexChunkAsync(string indexName, List entities, Type modelType); + + /// + /// Get name of index to create and name of index to delete. + /// + /// + /// + Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName); + + /// + /// Creates index with the given name. + /// If the index exists already, it will be deleted first. + /// + /// + /// + /// + /// + Task CreateIndex(string indexName, Type type); + + /// + /// Switch index + /// + /// + /// + /// + /// + Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete); } \ No newline at end of file diff --git a/aspnetcore/src/Indexer/Indexer.cs b/aspnetcore/src/Indexer/Indexer.cs index d8bcecd..f82b7a1 100644 --- a/aspnetcore/src/Indexer/Indexer.cs +++ b/aspnetcore/src/Indexer/Indexer.cs @@ -135,6 +135,10 @@ private async Task IndexEntities(string indexName, List finalized = new(); if (indexName.Contains("publication")) { + + var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName); + await _indexService.CreateIndex(indexToCreate, type); + /* * Process database result in smaller chunks to keep memory requirement low. * Chunking is based on skip/take query. @@ -143,8 +147,9 @@ private async Task IndexEntities(string indexName, */ int skipAmount = 0; - int takeAmount = 200000; + int takeAmount = 2; int numOfResults = 0; + int processedCount = 0; do { @@ -155,14 +160,18 @@ private async Task IndexEntities(string indexName, if (numOfResults > 0) { - _logger.LogInformation("{EntityType}: In-memory operations start", type.Name); foreach (object entity in indexModels) { finalized.Add(repository.PerformInMemoryOperation(entity)); } - _logger.LogInformation("{EntityType}: In-memory operations complete", type.Name); + await _indexService.IndexChunkAsync(indexToCreate, finalized, type); } skipAmount = skipAmount + takeAmount; + finalized = new(); + processedCount = processedCount + numOfResults; + _logger.LogInformation("{EntityType}: Total indexed count = {processedCount}", type.Name, processedCount); } while(numOfResults >= takeAmount-1); + + await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete); } else { @@ -180,20 +189,21 @@ private async Task IndexEntities(string indexName, _logger.LogInformation("{EntityType}: Start in-memory operations", type.Name); finalized = repository.PerformInMemoryOperations(indexModels); } - } - var inMemoryElapsed = stopWatch.Elapsed; - if (finalized.Count > 0) - { - _logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed); - await _indexService.IndexAsync(indexName, finalized, type); - var indexingElapsed = stopWatch.Elapsed; - _logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed); - _logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed); - } - else - { - _logger.LogInformation("{EntityType}: Nothing to index", type.Name); + var inMemoryElapsed = stopWatch.Elapsed; + + if (finalized.Count > 0) + { + _logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed); + await _indexService.IndexAsync(indexName, finalized, type); + var indexingElapsed = stopWatch.Elapsed; + _logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed); + _logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed); + } + else + { + _logger.LogInformation("{EntityType}: Nothing to index", type.Name); + } } } catch (Exception ex) From b3f9b2ef0240023380a9818126c5b01d53a82e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Fri, 12 Jan 2024 12:42:45 +0200 Subject: [PATCH 2/3] Set chunk size to 50000 --- aspnetcore/src/Indexer/Indexer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aspnetcore/src/Indexer/Indexer.cs b/aspnetcore/src/Indexer/Indexer.cs index f82b7a1..92f2d4d 100644 --- a/aspnetcore/src/Indexer/Indexer.cs +++ b/aspnetcore/src/Indexer/Indexer.cs @@ -147,7 +147,7 @@ private async Task IndexEntities(string indexName, */ int skipAmount = 0; - int takeAmount = 2; + int takeAmount = 50000; int numOfResults = 0; int processedCount = 0; From 6b5533dd28bbd2804ad1179d099f690f0e4ee9ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Fri, 12 Jan 2024 14:23:57 +0200 Subject: [PATCH 3/3] Reuse same function when activating new index --- .../ElasticSearchIndexService.cs | 18 ++---------------- aspnetcore/src/Indexer/Indexer.cs | 6 ++++-- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs index 38f68c5..7e4b48e 100644 --- a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs +++ b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs @@ -31,22 +31,8 @@ public async Task IndexAsync(string indexName, List entities, Type model // Add entities to the new index. await IndexEntities(indexToCreate, entities, modelType); - // Wait for new index to be operational. - await _elasticClient.Cluster - .HealthAsync(selector: s => s - .WaitForStatus(Elasticsearch.Net.WaitForStatus.Yellow) - .WaitForActiveShards("1") - .Index(indexToCreate)); - - // Add new alias from index_new to index - await _elasticClient.Indices.BulkAliasAsync(r => r - // Remove alias "index_old => index" - .Remove(remove => remove.Alias(indexName).Index("*")) - // Add alias "index_new => index" - .Add(add => add.Alias(indexName).Index(indexToCreate))); - - // Delete the old index if it exists. - await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); + // Switch indexes + await SwitchIndexes(indexName, indexToCreate, indexToDelete); _logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName); } diff --git a/aspnetcore/src/Indexer/Indexer.cs b/aspnetcore/src/Indexer/Indexer.cs index 92f2d4d..bd7b21a 100644 --- a/aspnetcore/src/Indexer/Indexer.cs +++ b/aspnetcore/src/Indexer/Indexer.cs @@ -136,6 +136,7 @@ private async Task IndexEntities(string indexName, if (indexName.Contains("publication")) { + // Create new index var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName); await _indexService.CreateIndex(indexToCreate, type); @@ -166,11 +167,12 @@ private async Task IndexEntities(string indexName, await _indexService.IndexChunkAsync(indexToCreate, finalized, type); } skipAmount = skipAmount + takeAmount; - finalized = new(); processedCount = processedCount + numOfResults; - _logger.LogInformation("{EntityType}: Total indexed count = {processedCount}", type.Name, processedCount); + finalized = new(); + _logger.LogInformation("{EntityType}: Total documents indexed = {processedCount}", type.Name, processedCount); } while(numOfResults >= takeAmount-1); + // Activate new index and delete old await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete); } else