-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Fix bug in module schema * Update CHANGELOG.md * Update backend/CHANGELOG.md Co-authored-by: Søren Hjort <[email protected]> * Update ContractExtensions.cs * Update ContractExtensions.cs * Added test and fix wrong key for module v1 where schema not embedded * Update UpdateModuleSourceCatchup.cs * Update UpdateModuleSourceCatchup.cs --------- Co-authored-by: Søren Hjort <[email protected]>
- Loading branch information
1 parent
50de657
commit eb7d009
Showing
10 changed files
with
171 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
112 changes: 112 additions & 0 deletions
112
backend/Application/Aggregates/Contract/Jobs/UpdateModuleSourceCatchup.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Application.Aggregates.Contract.Configurations; | ||
using Application.Aggregates.Contract.Entities; | ||
using Application.Aggregates.Contract.Observability; | ||
using Application.Aggregates.Contract.Resilience; | ||
using Application.Api.GraphQL.EfCore; | ||
using Application.Observability; | ||
using Microsoft.EntityFrameworkCore; | ||
using Microsoft.Extensions.Options; | ||
using Serilog.Context; | ||
|
||
namespace Application.Aggregates.Contract.Jobs; | ||
|
||
/// <summary> | ||
/// Update all <see cref="ModuleReferenceEvent"/> where <see cref="ModuleReferenceEvent.Schema"/> isn't null. | ||
/// | ||
/// Overrides existing <see cref="ModuleReferenceEvent.Schema"/> and <see cref="ModuleReferenceEvent.SchemaVersion"/> | ||
/// with logic from <see cref="ModuleReferenceEvent.ModuleSourceInfo.GetModuleSchema"/>. | ||
/// </summary> | ||
public class UpdateModuleSourceCatchup : IContractJob | ||
{ | ||
private readonly IContractNodeClient _client; | ||
private readonly IDbContextFactory<GraphQlDbContext> _dbContextFactory; | ||
private readonly ContractHealthCheck _healthCheck; | ||
private readonly ILogger _logger; | ||
private readonly ContractAggregateOptions _contractAggregateOptions; | ||
private readonly ContractAggregateJobOptions _jobOptions; | ||
|
||
/// <summary> | ||
/// WARNING - Do not change this if job already executed on environment, since it will trigger rerun of job. | ||
/// </summary> | ||
private const string JobName = "UpdateModuleSourceCatchup"; | ||
|
||
public UpdateModuleSourceCatchup( | ||
IContractNodeClient client, | ||
IDbContextFactory<GraphQlDbContext> dbContextFactory, | ||
IOptions<ContractAggregateOptions> options, | ||
ContractHealthCheck healthCheck) | ||
{ | ||
_client = client; | ||
_dbContextFactory = dbContextFactory; | ||
_healthCheck = healthCheck; | ||
_logger = Log.ForContext<UpdateModuleSourceCatchup>(); | ||
_contractAggregateOptions = options.Value; | ||
var gotJobOptions = _contractAggregateOptions.Jobs.TryGetValue(GetUniqueIdentifier(), out var jobOptions); | ||
_jobOptions = gotJobOptions ? jobOptions! : new ContractAggregateJobOptions(); | ||
} | ||
|
||
private async Task<IList<string>> GetModuleReferences() | ||
{ | ||
await using var context = await _dbContextFactory.CreateDbContextAsync(); | ||
|
||
return await context.ModuleReferenceEvents | ||
.AsNoTracking() | ||
.Where(m => m.ModuleSource != null) | ||
.Select(m => m.ModuleReference) | ||
.ToListAsync(); | ||
} | ||
|
||
private async ValueTask Process(string moduleReference, ulong lastFinalized, CancellationToken token) | ||
{ | ||
await Policies.GetTransientPolicy(_logger, _contractAggregateOptions.RetryCount, _contractAggregateOptions.RetryDelay) | ||
.ExecuteAsync(async () => | ||
{ | ||
await using var context = await _dbContextFactory.CreateDbContextAsync(token); | ||
|
||
var module = await context.ModuleReferenceEvents | ||
.SingleAsync(m => m.ModuleReference == moduleReference, cancellationToken: token); | ||
|
||
var moduleSourceInfo = await ModuleReferenceEvent.ModuleSourceInfo.Create(_client, lastFinalized, moduleReference); | ||
module.UpdateWithModuleSourceInfo(moduleSourceInfo); | ||
await context.SaveChangesAsync(token); | ||
|
||
_logger.Information($"Updated module {moduleReference}"); | ||
}); | ||
} | ||
|
||
public async Task StartImport(CancellationToken token) | ||
{ | ||
using var _ = TraceContext.StartActivity(GetUniqueIdentifier()); | ||
using var __ = LogContext.PushProperty("Job", GetUniqueIdentifier()); | ||
|
||
try | ||
{ | ||
var moduleReferences = await GetModuleReferences(); | ||
|
||
_logger.Information($"Starts process {moduleReferences.Count} modules"); | ||
|
||
var consensusInfo = await _client.GetConsensusInfoAsync(token); | ||
|
||
var cycle = Parallel.ForEachAsync(moduleReferences, | ||
new ParallelOptions | ||
{ | ||
MaxDegreeOfParallelism = _jobOptions.MaxParallelTasks | ||
}, (moduleRef, cancellationToken) => Process(moduleRef, consensusInfo.LastFinalizedBlockHeight, cancellationToken)); | ||
await cycle; | ||
|
||
_logger.Information($"Done with job {GetUniqueIdentifier()}"); | ||
} | ||
catch (Exception e) | ||
{ | ||
_healthCheck.AddUnhealthyJobWithMessage(GetUniqueIdentifier(), "Job stopped due to exception."); | ||
_logger.Fatal(e, $"{GetUniqueIdentifier()} stopped due to exception."); | ||
throw; | ||
} | ||
} | ||
|
||
public string GetUniqueIdentifier() => JobName; | ||
|
||
public bool ShouldNodeImportAwait() => false; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file added
BIN
+52.4 KB
backend/Tests/TestUtilities/TestData/cis1-wccd-embedded-schema-v0-unversioned.wasm
Binary file not shown.
Binary file added
BIN
+53.4 KB
backend/Tests/TestUtilities/TestData/cis1-wccd-embedded-schema-v0-versioned.wasm.v0
Binary file not shown.
Binary file added
BIN
+52.3 KB
backend/Tests/TestUtilities/TestData/cis2-wccd-embedded-schema-v1-unversioned.wasm.v1
Binary file not shown.
Binary file added
BIN
+53 KB
backend/Tests/TestUtilities/TestData/cis2-wccd-embedded-schema-v1-versioned.wasm.v1
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters