From 6f88c55c65fea6e897b6937b902b53ad6d2cad21 Mon Sep 17 00:00:00 2001 From: Await Future Date: Sat, 20 Mar 2021 02:37:14 -0500 Subject: [PATCH] :[WIP] Support events watching from specific contracts --- .gitignore | 3 ++ config/runtime.exs | 74 ++++++++++++++++++++++++++++++ docs/COMMANDS.md | 0 examples/event_handler.ex | 10 ++-- lib/slurp/adapters/evm.ex | 7 ++- lib/slurp/logs/log_fetcher.ex | 16 +++++-- lib/slurp/logs/log_subscription.ex | 6 ++- lib/slurp/logs/subscriptions.ex | 28 ++++++++--- mix.exs | 4 +- 9 files changed, 128 insertions(+), 20 deletions(-) mode change 100644 => 100755 docs/COMMANDS.md mode change 100644 => 100755 mix.exs diff --git a/.gitignore b/.gitignore index a959f30..ea572f1 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,6 @@ slurp-*.tar # Elixir language server .elixir_ls/ + +# Ignore log files +/log/ diff --git a/config/runtime.exs b/config/runtime.exs index a60e862..ed38740 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -304,3 +304,77 @@ if config_env() == :test do } } end + +if config_env() == :testnet do + config :slurp, + blockchains: %{ + "ethereum-rinkeby" => %{ + start_on_boot: false, + name: "Ethereum rinkeby", + adapter: Slurp.Adapters.Evm, + network_id: 4, + chain_id: 1, + chain: "ETH", + testnet: true, + timeout: 5000, + new_head_initial_history: 128, + poll_interval_ms: 2_500, + rpc: [ + "https://rinkeby-light.eth.linkpool.io" + ] + } + } + + config :slurp, + new_head_subscriptions: %{ + "*" => [ + %{ + enabled: true, + handler: {Examples.NewHeadHandler, :handle_new_head, []} + } + ] + } + + + config :slurp, + log_subscriptions: %{ + "*" => %{ + # ERC20 + {"Approval(address,address,uint256)", + ["0xaFF4481D10270F50f203E0763e2597776068CBc5", + "0x022E292b44B5a146F2e8ee36Ff44D3dd863C915c", + "0xc6fDe3FD2Cc2b173aEC24cc3f267cb3Cd78a26B7", + "0x1f9061B953bBa0E36BF50F21876132DcF276fC6e"]} => [ + %{ + enabled: true, + struct: Examples.Erc20.Events.Approval, + handler: {Examples.EventHandler, :handle_erc20_approval, []}, + abi: [ + %{ + "anonymous" => false, + "inputs" => [ + %{ + "indexed" => true, + "name" => "owner", + "type" => "address" + }, + %{ + "indexed" => true, + "name" => "spender", + "type" => "address" + }, + %{ + "indexed" => false, + "name" => "value", + "type" => "uint256" + } + ], + "name" => "Approval", + "type" => "event" + } + ] + } + ] + } + } +end diff --git a/docs/COMMANDS.md b/docs/COMMANDS.md old mode 100644 new mode 100755 diff --git a/examples/event_handler.ex b/examples/event_handler.ex index 101f9ba..8d8a51a 100644 --- a/examples/event_handler.ex +++ b/examples/event_handler.ex @@ -1,8 +1,8 @@ defmodule Examples.EventHandler do require Logger - def handle_erc20_approval(blockchain, log, event) do - handle_event(blockchain, log, event) + def handle_erc20_approval(blockchain, log, %{address: addr} = event) when is_bitstring(addr) do + handle_event(blockchain, log, event, addr) end def handle_erc20_transfer(blockchain, log, event) do @@ -22,11 +22,11 @@ defmodule Examples.EventHandler do end def handle_uniswap_v2_sync(blockchain, log, event) do - handle_event(blockchain,log, event) + handle_event(blockchain,log, event) end - defp handle_event(_blockchain, %{"blockNumber" => hex_block_number}= _log, event) do + defp handle_event(_blockchain, %{"blockNumber" => hex_block_number}= _log, event, address \\ nil) do {:ok, block_number} = Slurp.Utils.hex_to_integer(hex_block_number) - Logger.info "received event: #{inspect event}, block_number: #{block_number}" + Logger.info "received event: #{inspect event}, block_number: #{block_number}, address: #{address}" end end diff --git a/lib/slurp/adapters/evm.ex b/lib/slurp/adapters/evm.ex index 815faea..ac6adf6 100644 --- a/lib/slurp/adapters/evm.ex +++ b/lib/slurp/adapters/evm.ex @@ -37,16 +37,18 @@ defmodule Slurp.Adapters.Evm do def deserialize_log_event(log, log_subscription) do [_hashed_event_signature | indexed_topics] = log["topics"] - log_subscription.abi + {rt_code, log_event} = log_subscription.abi |> Enum.reduce( :ok, fn abi, :ok -> with {:ok, _event} = result <- Evm.Abi.deserialize_log_into(log, log_subscription.struct, abi, indexed_topics) do + result else - {:error, reason} -> {:error, [reason]} + {:error, reason} -> + {:error, [reason]} end abi, {:error, reasons} -> @@ -61,6 +63,7 @@ defmodule Slurp.Adapters.Evm do acc end ) + {rt_code, Map.merge(log_event, %{address: log["address"]})} end @spec get_logs(log_filter, endpoint) :: {:ok, [log]} | {:error, term} diff --git a/lib/slurp/logs/log_fetcher.ex b/lib/slurp/logs/log_fetcher.ex index 068c734..3e8a204 100644 --- a/lib/slurp/logs/log_fetcher.ex +++ b/lib/slurp/logs/log_fetcher.ex @@ -7,6 +7,7 @@ defmodule Slurp.Logs.LogFetcher do @type block_number :: Adapter.block_number() @type hashed_event_signature :: Adapter.hashed_event_signature() @type log_subscription :: Logs.LogSubscription.t() + @type address :: Slurp.Adapter.address @type t :: %State{ blockchain: Blockchains.Blockchain.t(), @@ -15,10 +16,11 @@ defmodule Slurp.Logs.LogFetcher do hashed_event_signature => log_subscription }, topics: [hashed_event_signature], + address: [address], last_block_number: block_number | nil } - defstruct ~w[blockchain endpoint subscriptions topics last_block_number]a + defstruct ~w[blockchain endpoint subscriptions topics address last_block_number]a end @type blockchain :: Blockchains.Blockchain.t() @@ -30,13 +32,18 @@ defmodule Slurp.Logs.LogFetcher do def start_link(blockchain: blockchain, subscriptions: subscriptions) do name = process_name(blockchain.id) topics = Enum.map(subscriptions, & &1.hashed_event_signature) + |> Enum.uniq() + + addresses = Enum.map(subscriptions, & &1.address) + |> Enum.filter(& &1 != nil) {:ok, endpoint} = Blockchains.Blockchain.endpoint(blockchain) state = %State{ blockchain: blockchain, endpoint: endpoint, subscriptions: index_subscriptions(subscriptions), - topics: topics + topics: topics, + address: addresses } GenServer.start_link(__MODULE__, state, name: name) @@ -141,10 +148,13 @@ defmodule Slurp.Logs.LogFetcher do {:ok, to_hex_block} = to_block |> Slurp.Utils.integer_to_hex() %{ - topics: [state.topics], + topics: state.topics, from_block: from_hex_block, to_block: to_hex_block } |> ProperCase.to_camel_case() + |> build_filter(state) end + defp build_filter(filter, %State{address: addr}) when length(addr) > 0, do: Map.put(filter, :address, addr) + defp build_filter(filter, _), do: filter end diff --git a/lib/slurp/logs/log_subscription.ex b/lib/slurp/logs/log_subscription.ex index 712c398..4432059 100644 --- a/lib/slurp/logs/log_subscription.ex +++ b/lib/slurp/logs/log_subscription.ex @@ -8,7 +8,8 @@ defmodule Slurp.Logs.LogSubscription do hashed_event_signature: Slurp.Adapter.hashed_event_signature(), struct: module, handler: {module, atom, list}, - abi: [map] + abi: [map], + address: String.t() } defstruct ~w[ @@ -19,11 +20,12 @@ defmodule Slurp.Logs.LogSubscription do struct handler abi + address ]a defimpl Stored.Item do def key(log_subscription) do - {log_subscription.blockchain_id, log_subscription.event_signature} + {log_subscription.blockchain_id, log_subscription.event_signature, log_subscription.address} end end end diff --git a/lib/slurp/logs/subscriptions.ex b/lib/slurp/logs/subscriptions.ex index bb257de..db28203 100644 --- a/lib/slurp/logs/subscriptions.ex +++ b/lib/slurp/logs/subscriptions.ex @@ -28,9 +28,10 @@ defmodule Slurp.Logs.Subscriptions do |> Juice.squeeze(blockchains_query) |> Enum.flat_map(fn {blockchain_id, blockchain} -> events - |> Enum.flat_map(fn {event_signature, handlers} -> + |> Enum.flat_map(fn {event_header, handlers} -> handlers - |> Enum.map(fn handler -> + |> Enum.flat_map(fn handler -> + {event_signature, addresses} = parse_event_signature(event_header) hashed_event_signature = Slurp.Adapter.hash_event_signature(blockchain, event_signature) @@ -38,15 +39,14 @@ defmodule Slurp.Logs.Subscriptions do Map.merge(handler, %{ blockchain_id: blockchain_id, event_signature: event_signature, - hashed_event_signature: hashed_event_signature + hashed_event_signature: hashed_event_signature, + address: addresses }) - - struct!(Logs.LogSubscription, attrs) + make_log_subscription(attrs) end) end) end) end) - {:ok, log_subscriptions} end @@ -77,4 +77,20 @@ defmodule Slurp.Logs.Subscriptions do def put(log_subscription, store_id \\ Logs.LogSubscriptionStore.default_store_id()) do Logs.LogSubscriptionStore.put(log_subscription, store_id) end + + defp parse_event_signature(signature) when is_bitstring(signature), do: parse_event_signature({signature, []}) + defp parse_event_signature({event_header, address}), do: {event_header, address} + + defp make_log_subscription(%{address: []} = attrs) do + make_log_subscription(%{attrs | address: nil}) + end + defp make_log_subscription(%{address: [address | []]} = attrs) do + make_log_subscription(%{attrs | address: address}) + end + defp make_log_subscription(%{address: [address | rest]} = attrs) do + make_log_subscription(%{attrs | address: address}) ++ make_log_subscription(%{attrs | address: rest}) + end + defp make_log_subscription(attrs) do + [struct!(Logs.LogSubscription, attrs)] + end end diff --git a/mix.exs b/mix.exs old mode 100644 new mode 100755 index 34a7ab1..863d514 --- a/mix.exs +++ b/mix.exs @@ -45,7 +45,7 @@ defmodule Slurp.MixProject do {:telemetry_metrics, "~> 0.4"}, {:telemetry_poller, "~> 0.4"}, {:telemetry_metrics_prometheus, "~> 1.0"}, - {:logger_file_backend, "~> 0.0.10", only: [:dev, :test]}, + {:logger_file_backend, "~> 0.0.10", only: [:dev, :test, :testnet]}, {:dialyxir, "~> 1.0", only: :dev, runtime: false}, {:mix_test_watch, "~> 1.0", only: :dev, runtime: false}, {:ex_unit_notifier, "~> 1.0", only: :test} @@ -64,6 +64,6 @@ defmodule Slurp.MixProject do } end - defp elixirc_paths(env) when env == :test or env == :dev, do: ["lib", "examples"] + defp elixirc_paths(env) when env == :test or env == :dev or env == :testnet, do: ["lib", "examples"] defp elixirc_paths(_), do: ["lib"] end