Skip to content

Commit

Permalink
Early Stages of EventLog processing (#1)
Browse files Browse the repository at this point in the history
Initial port of Ujo event log processing objects
  • Loading branch information
Dave-Whiffin authored Nov 20, 2018
1 parent d1a5446 commit 460b04c
Show file tree
Hide file tree
Showing 34 changed files with 783 additions and 89 deletions.
162 changes: 162 additions & 0 deletions Nethereum.BlockchainProcessing.Samples/EventLogEnumeration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Numerics;
using System.Text;
using System.Threading.Tasks;
using Castle.Components.DictionaryAdapter;
using Nethereum.ABI.FunctionEncoding.Attributes;
using Nethereum.BlockchainProcessing.Handlers;
using Nethereum.BlockchainProcessing.Processing;
using Nethereum.BlockchainProcessing.Processing.Logs;
using Nethereum.BlockchainProcessing.Web3Abstractions;
using Nethereum.Contracts;
using Nethereum.Contracts.Extensions;
using Nethereum.RPC.Eth.DTOs;
using Xunit;

namespace Nethereum.BlockchainProcessing.Samples
{
public class EventLogEnumeration
{
/*
Solidity Contract Excerpt
* event Transfer(address indexed _from, address indexed _to, uint256 indexed _value);
Other contracts may have transfer events with different signatures, this won't work for those.
*/
[Event("Transfer")]
public class TransferEvent
{
[Parameter("address", "_from", 1, true)]
public string From {get; set;}

[Parameter("address", "_to", 2, true)]
public string To {get; set;}

[Parameter("uint256", "_value", 3, true)]
public BigInteger Value {get; set;}
}

public class TransferEventProcessor : ILogProcessor
{
public List<(FilterLog, EventLog<TransferEvent>)> ProcessedEvents = new List<(FilterLog, EventLog<TransferEvent>)>();
public List<(FilterLog, Exception)> DecodingErrors = new List<(FilterLog, Exception)>();

public bool IsLogForEvent(FilterLog log)
{
return log.IsLogForEvent<TransferEvent>();
}

public Task ProcessLogsAsync(params FilterLog[] eventLogs)
{
foreach (var eventLog in eventLogs)
{
try
{
var eventDto = eventLog.DecodeEvent<TransferEvent>();
ProcessedEvents.Add((eventLog, eventDto));

}
catch (Exception ex)
{
DecodingErrors.Add((eventLog, ex));
}
}

return Task.CompletedTask;
}
}

public class CatchAllEventProcessor : ILogProcessor
{
public List<FilterLog> ProcessedEvents = new List<FilterLog>();

public bool IsLogForEvent(FilterLog log)
{
return true;
}

public Task ProcessLogsAsync(params FilterLog[] eventLogs)
{
ProcessedEvents.AddRange(eventLogs);
return Task.CompletedTask;
}
}

[Fact]
public async Task RunOnce()
{
var web3Wrapper = new Web3Wrapper("https://rinkeby.infura.io/v3/25e7b6dfc51040b3bfc0e47317d38f60");

var transferEventProcessor = new TransferEventProcessor();
var catchAllEventProcessor = new CatchAllEventProcessor();
var eventProcessors = new ILogProcessor[] {catchAllEventProcessor, transferEventProcessor};

var logProcessor = new BlockchainLogProcessor(web3Wrapper, eventProcessors);

var progressFileNameAndPath = Path.Combine(Path.GetTempPath(), "BlockProcess.json");
if(File.Exists(progressFileNameAndPath)) File.Delete(progressFileNameAndPath);

var progressRepository = new JsonBlockProcessProgressRepository(progressFileNameAndPath);
var progressService = new PreDefinedRangeBlockchainProcessingProgressService(
3146684, 3146684, progressRepository);

var batchProcessorService = new BlockchainBatchProcessorService(
logProcessor, progressService, maxNumberOfBlocksPerBatch: 1);

await batchProcessorService.ProcessLatestBlocks();

Assert.Single(transferEventProcessor.ProcessedEvents);
Assert.Equal(7, catchAllEventProcessor.ProcessedEvents.Count);

Assert.Equal((ulong?)3146684, await progressRepository.GetLatestAsync());

}

[Fact]
public async Task RunContinually()
{
const ulong StartingBlockNumber = 3146684;
var web3Wrapper = new Web3Wrapper("https://rinkeby.infura.io/v3/25e7b6dfc51040b3bfc0e47317d38f60");

var transferEventProcessor = new TransferEventProcessor();
var catchAllEventProcessor = new CatchAllEventProcessor();
var eventProcessors = new ILogProcessor[] {catchAllEventProcessor, transferEventProcessor};

var logProcessor = new BlockchainLogProcessor(web3Wrapper, eventProcessors);

var progressFileNameAndPath = Path.Combine(Path.GetTempPath(), "BlockProcess.json");
if(File.Exists(progressFileNameAndPath)) File.Delete(progressFileNameAndPath);

var progressRepository = new JsonBlockProcessProgressRepository(progressFileNameAndPath);

//this will get the last block on the chain each time a "to" block is requested
var progressService = new LatestBlockBlockchainProcessingProgressService(
web3Wrapper, StartingBlockNumber, progressRepository);

var batchProcessorService = new BlockchainBatchProcessorService(
logProcessor, progressService, maxNumberOfBlocksPerBatch: 10);

var iterations = 0;

//iterate until we reach an arbitrary ending block
//to process continually - remove the condition from the while loop
while (progressRepository.Latest < (StartingBlockNumber + 100))
{
await batchProcessorService.ProcessLatestBlocks();
iterations++;
}

Assert.Equal(10, iterations);
Assert.Equal(1533, catchAllEventProcessor.ProcessedEvents.Count);
Assert.Equal(40, transferEventProcessor.ProcessedEvents.Count);

//events on other contracts may have same name and input parameter types
//however they may differ in the number of indexed fields
//this leads to decoding errors
//it's not a problem - just something to be aware of
Assert.Equal(201, transferEventProcessor.DecodingErrors.Count);
}
}
}
9 changes: 8 additions & 1 deletion Nethereum.BlockchainProcessing.Samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ Here are some quick start samples to demonstrate processing the chain.
They are written as unit tests against known data on a publicly available testnet.
This means you can run them and expect the same results.

## Block and Transaction Processing (includes Event Logs)
If you need transactions and logs then use these examples.

* [Block And Transaction Enumeration](BlockAndTransactionEnumeration.cs)
* [Filtering Transactions](FilterTransactions.cs)
* [Listening For A Specific Event](ListeningForASpecificEvent.cs)
* [Listening For A Specific Function Call](ListeningForASpecificFunctionCall.cs)
* [Conditional Transaction Routing](ConditionalTransactionRouting.cs)
* [Conditional Transaction Log Routing](ConditionalTransactionLogRouting.cs)
* [Conditional Transaction Log Routing](ConditionalTransactionLogRouting.cs)

## Event Log Processing (excludes transactions)
If you are ONLY interested in event logs - then this is a much faster way of enumerating the chain and listening for events.
* [EventLogEnumeration](EventLogEnumeration.cs)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Moq;
using System;
using Moq;
using Nethereum.BlockchainProcessing.Processing;
using Nethereum.JsonRpc.Client;
using System.Threading;
Expand Down Expand Up @@ -42,7 +43,7 @@ public async Task
await Processor.ExecuteAsync(startBlock: 0, endBlock: 0, cancellationToken: cancellationTokenSource.Token);

MockProcessingStrategy.VerifyAll();
MockProcessingStrategy.Verify(p => p.ProcessBlockAsync(It.IsAny<long>()), Times.Never);
MockProcessingStrategy.Verify(p => p.ProcessBlockAsync(It.IsAny<ulong>()), Times.Never);

}

Expand All @@ -52,8 +53,8 @@ public class When_Block_Range_Is_Specified : BlockchainProcessorTests
public async Task
Will_Process_The_Requested_Block_Range()
{
const long StartBlock = 1000;
const long EndBlock = 1205;
const ulong StartBlock = 1000;
const ulong EndBlock = 1205;

for (var block = StartBlock; block <= EndBlock; block++)
{
Expand All @@ -68,9 +69,9 @@ public async Task
[Fact]
public async Task Will_Wait_For_Minimum_Block_Confirmations()
{
const int RequiredBlockConfirmations = 6;
long maxBlockNumber = 0;
int numberOfWaitCycles = 0;
const uint RequiredBlockConfirmations = 6;
ulong maxBlockNumber = 0;
uint numberOfWaitCycles = 0;
int blocksProcessed = 0;

MockProcessingStrategy
Expand All @@ -83,8 +84,8 @@ public async Task Will_Wait_For_Minimum_Block_Confirmations()
.ReturnsAsync(() => maxBlockNumber);

MockProcessingStrategy
.Setup(s => s.WaitForNextBlock(It.IsAny<int>()))
.Callback<int>((retryNumber) => numberOfWaitCycles++)
.Setup(s => s.WaitForNextBlock(It.IsAny<uint>()))
.Callback<uint>((retryNumber) => numberOfWaitCycles++)
.Returns(Task.CompletedTask);

MockProcessingStrategy
Expand Down Expand Up @@ -124,10 +125,10 @@ public async Task Will_Retry_Up_To_Retry_Limit_And_Pause_Between_Each_Attempt()

MockProcessingStrategy
.Setup(p => p.ProcessBlockAsync(0))
.Callback<long>((blkNum) => timesThrown++)
.Callback<ulong>((blkNum) => timesThrown++)
.Throws(blockProcessingException);

for (var retryNum = 0; retryNum < MaxRetries; retryNum++)
for (uint retryNum = 0; retryNum < MaxRetries; retryNum++)
{
MockProcessingStrategy
.Setup(s => s.PauseFollowingAnError(retryNum))
Expand Down Expand Up @@ -157,7 +158,7 @@ public async Task

MockProcessingStrategy
.Setup(p => p.ProcessBlockAsync(1))
.Callback<long>(blkNum => cancellationTokenSource.Cancel())
.Callback<ulong>(blkNum => cancellationTokenSource.Cancel())
.Returns(Task.CompletedTask)
.Verifiable("ProcessBlockAsync should have been called for Block 1");

Expand All @@ -172,10 +173,10 @@ public async Task
[Fact]
public async Task Will_Wait_For_Minimum_Block_Confirmations()
{
const int RequiredBlockConfirmations = 6;
long maxBlockNumber = 0;
int numberOfWaitCycles = 0;
int blocksProcessed = 0;
const uint RequiredBlockConfirmations = 6;
ulong maxBlockNumber = 0;
uint numberOfWaitCycles = 0;
uint blocksProcessed = 0;

var cancellationTokenSource = new CancellationTokenSource();

Expand All @@ -189,8 +190,8 @@ public async Task Will_Wait_For_Minimum_Block_Confirmations()
.ReturnsAsync(() => maxBlockNumber);

MockProcessingStrategy
.Setup(s => s.WaitForNextBlock(It.IsAny<int>()))
.Callback<int>((retryNumber) => numberOfWaitCycles++)
.Setup(s => s.WaitForNextBlock(It.IsAny<uint>()))
.Callback<uint>((retryNumber) => numberOfWaitCycles++)
.Returns(Task.CompletedTask);

MockProcessingStrategy
Expand All @@ -206,7 +207,7 @@ public async Task Will_Wait_For_Minimum_Block_Confirmations()

Assert.False(result, "Result should be false because execution should have been stopped by cancellation token");

Assert.Equal(1, blocksProcessed);
Assert.Equal((uint)1, blocksProcessed);
Assert.Equal(RequiredBlockConfirmations - 1, numberOfWaitCycles);
Assert.Equal(RequiredBlockConfirmations, maxBlockNumber);
MockProcessingStrategy.VerifyAll();
Expand Down Expand Up @@ -247,9 +248,9 @@ public class When_Start_Block_Is_Not_Specified : BlockchainProcessorTests
public async Task
Requests_Last_Block_Processed_From_Strategy_And_Uses_The_Previous_Block_Number()
{
const long LastBlockProcessed = 11;
const long ExpectedStartBlock = LastBlockProcessed - 1;
const long EndingBlock = ExpectedStartBlock;
const ulong LastBlockProcessed = 11;
const ulong ExpectedStartBlock = LastBlockProcessed - 1;
const ulong EndingBlock = ExpectedStartBlock;

MockProcessingStrategy
.Setup(s => s.GetLastBlockProcessedAsync())
Expand All @@ -267,9 +268,9 @@ public async Task
[Fact]
public async Task Last_Block_Processsed_Is_Only_Used_If_Greater_Than_Minimum_Block_Number()
{
const long LastBlockProcessed = 11;
const long MinimumStartingBlock = 20;
const long EndingBlock = MinimumStartingBlock;
const ulong LastBlockProcessed = 11;
const ulong MinimumStartingBlock = 20;
const ulong EndingBlock = MinimumStartingBlock;

MockProcessingStrategy
.SetupGet(s => s.MinimumBlockNumber)
Expand Down
Loading

0 comments on commit 460b04c

Please sign in to comment.