Skip to content

Commit

Permalink
:[WIP] Support events watching from specific contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
AwaitFuture committed Aug 25, 2021
1 parent 3d28568 commit a68107d
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ slurp-*.tar

# Elixir language server
.elixir_ls/

# Ignore log files
/log/
76 changes: 75 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ if config_env() == :test do
config :slurp,
blockchains: %{
"ethereum-mainnet" => %{
start_on_boot: false,
start_on_boot: true,
name: "Ethereum Mainnet",
network_id: 1,
chain_id: 1,
Expand All @@ -304,3 +304,77 @@ if config_env() == :test do
}
}
end

if config_env() == :testnet do
config :slurp,
blockchains: %{
"ethereum-rinkeby" => %{
start_on_boot: false,
name: "Ethereum rinkeby",
adapter: Slurp.Adapters.Evm,
network_id: 4,
chain_id: 1,
chain: "ETH",
testnet: true,
timeout: 5000,
new_head_initial_history: 128,
poll_interval_ms: 2_500,
rpc: [
"https://rinkeby-light.eth.linkpool.io"
]
}
}

config :slurp,
new_head_subscriptions: %{
"*" => [
%{
enabled: true,
handler: {Examples.NewHeadHandler, :handle_new_head, []}
}
]
}


config :slurp,
log_subscriptions: %{
"*" => %{
# ERC20
{"Approval(address,address,uint256)",
["0xaFF4481D10270F50f203E0763e2597776068CBc5",
"0x022E292b44B5a146F2e8ee36Ff44D3dd863C915c",
"0xc6fDe3FD2Cc2b173aEC24cc3f267cb3Cd78a26B7",
"0x1f9061B953bBa0E36BF50F21876132DcF276fC6e"]} => [
%{
enabled: true,
struct: Examples.Erc20.Events.Approval,
handler: {Examples.EventHandler, :handle_erc20_approval, []},
abi: [
%{
"anonymous" => false,
"inputs" => [
%{
"indexed" => true,
"name" => "owner",
"type" => "address"
},
%{
"indexed" => true,
"name" => "spender",
"type" => "address"
},
%{
"indexed" => false,
"name" => "value",
"type" => "uint256"
}
],
"name" => "Approval",
"type" => "event"
}
]
}
]
}
}
end
Empty file modified docs/COMMANDS.md
100644 → 100755
Empty file.
10 changes: 5 additions & 5 deletions examples/event_handler.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule Examples.EventHandler do
require Logger

def handle_erc20_approval(blockchain, log, event) do
handle_event(blockchain, log, event)
def handle_erc20_approval(blockchain, log, %{address: addr} = event) when is_bitstring(addr) do
handle_event(blockchain, log, event, addr)
end

def handle_erc20_transfer(blockchain, log, event) do
Expand All @@ -22,11 +22,11 @@ defmodule Examples.EventHandler do
end

def handle_uniswap_v2_sync(blockchain, log, event) do
handle_event(blockchain,log, event)
handle_event(blockchain,log, event)
end

defp handle_event(_blockchain, %{"blockNumber" => hex_block_number}= _log, event) do
defp handle_event(_blockchain, %{"blockNumber" => hex_block_number}= _log, event, address \\ nil) do
{:ok, block_number} = Slurp.Utils.hex_to_integer(hex_block_number)
Logger.info "received event: #{inspect event}, block_number: #{block_number}"
Logger.info "received event: #{inspect event}, block_number: #{block_number}, address: #{address}"
end
end
12 changes: 9 additions & 3 deletions lib/slurp/adapters/evm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Slurp.Adapters.Evm do

@spec hash_event_signature(event_signature) :: binary
def hash_event_signature(event_signature) do
IO.inspect(event_signature)
ExW3.Utils.keccak256(event_signature)
end

Expand All @@ -37,16 +38,18 @@ defmodule Slurp.Adapters.Evm do
def deserialize_log_event(log, log_subscription) do
[_hashed_event_signature | indexed_topics] = log["topics"]

log_subscription.abi
{rt_code, log_event} = log_subscription.abi
|> Enum.reduce(
:ok,
fn
abi, :ok ->
with {:ok, _event} = result <-
Evm.Abi.deserialize_log_into(log, log_subscription.struct, abi, indexed_topics) do

result
else
{:error, reason} -> {:error, [reason]}
{:error, reason} ->
{:error, [reason]}
end

abi, {:error, reasons} ->
Expand All @@ -61,11 +64,14 @@ defmodule Slurp.Adapters.Evm do
acc
end
)
{rt_code, Map.merge(log_event, %{address: log["address"]})}
end

@spec get_logs(log_filter, endpoint) :: {:ok, [log]} | {:error, term}
def get_logs(filter, endpoint) do
ExW3.get_logs(filter, url: endpoint)
r = ExW3.get_logs(filter, url: endpoint)
#IO.inspect(r)
r
end

@spec call(address, abi_fragment, list, call_params, endpoint) :: {:ok, term} | {:error, term}
Expand Down
2 changes: 1 addition & 1 deletion lib/slurp/iex/commands/start_blockchains_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Slurp.IEx.Commands.StartBlockchainsTest do
use ExUnit.Case
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestSupport.Blockchains, only: [put_blockchain: 2]

Expand Down
20 changes: 12 additions & 8 deletions lib/slurp/iex/commands/stop_blockchains_test.exs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
defmodule Slurp.IEx.Commands.StopBlockchainsTest do
use ExUnit.Case
use ExUnit.Case, async: false
import ExUnit.CaptureIO
import TestSupport.Blockchains, only: [put_blockchain: 2]

@test_store_id __MODULE__

setup do
on_exit(fn ->
Slurp.ConnectionsSupervisor.terminate_connection("ethereum-mainnet")
Slurp.ConnectionsSupervisor.terminate_connection("ethereum-ropsten")
end)

start_supervised!({Slurp.Blockchains.BlockchainStore, id: @test_store_id})
:ok
end
Expand Down Expand Up @@ -45,13 +49,13 @@ defmodule Slurp.IEx.Commands.StopBlockchainsTest do
@test_store_id
)

assert capture_io(fn -> Slurp.IEx.stop_blockchains(store_id: @test_store_id) end) ==
"Stopped blockchains: 0 new, 2 already stopped\n"
output = capture_io(fn -> Slurp.IEx.stop_blockchains(store_id: @test_store_id) end)
assert output =~ "Stopped blockchains: 0 new, 2 already stopped\n"

assert capture_io(fn -> Slurp.IEx.start_blockchains(store_id: @test_store_id) end) ==
"Started blockchains: 2 new, 0 already running\n"
output = capture_io(fn -> Slurp.IEx.start_blockchains(store_id: @test_store_id) end)
assert output =~ "Started blockchains: 2 new, 0 already running\n"

assert capture_io(fn -> Slurp.IEx.stop_blockchains(store_id: @test_store_id) end) ==
"Stopped blockchains: 2 new, 0 already stopped\n"
output = capture_io(fn -> Slurp.IEx.stop_blockchains(store_id: @test_store_id) end)
assert output =~ "Stopped blockchains: 2 new, 0 already stopped\n"
end
end
17 changes: 14 additions & 3 deletions lib/slurp/logs/log_fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Slurp.Logs.LogFetcher do
@type block_number :: Adapter.block_number()
@type hashed_event_signature :: Adapter.hashed_event_signature()
@type log_subscription :: Logs.LogSubscription.t()
@type address :: Slurp.Adapter.address

@type t :: %State{
blockchain: Blockchains.Blockchain.t(),
Expand All @@ -15,10 +16,11 @@ defmodule Slurp.Logs.LogFetcher do
hashed_event_signature => log_subscription
},
topics: [hashed_event_signature],
address: [address],
last_block_number: block_number | nil
}

defstruct ~w[blockchain endpoint subscriptions topics last_block_number]a
defstruct ~w[blockchain endpoint subscriptions topics address last_block_number]a
end

@type blockchain :: Blockchains.Blockchain.t()
Expand All @@ -30,13 +32,18 @@ defmodule Slurp.Logs.LogFetcher do
def start_link(blockchain: blockchain, subscriptions: subscriptions) do
name = process_name(blockchain.id)
topics = Enum.map(subscriptions, & &1.hashed_event_signature)
|> Enum.uniq()

addresses = Enum.map(subscriptions, & &1.address)
|> Enum.filter(& &1 != nil)
{:ok, endpoint} = Blockchains.Blockchain.endpoint(blockchain)

state = %State{
blockchain: blockchain,
endpoint: endpoint,
subscriptions: index_subscriptions(subscriptions),
topics: topics
topics: topics,
address: addresses
}

GenServer.start_link(__MODULE__, state, name: name)
Expand Down Expand Up @@ -68,6 +75,7 @@ defmodule Slurp.Logs.LogFetcher do

logs
|> Enum.each(fn log ->

{:ok, log_hashed_event_signature} =
Slurp.Adapter.log_hashed_event_signature(state.blockchain, log)

Expand Down Expand Up @@ -141,10 +149,13 @@ defmodule Slurp.Logs.LogFetcher do
{:ok, to_hex_block} = to_block |> Slurp.Utils.integer_to_hex()

%{
topics: [state.topics],
topics: state.topics,
from_block: from_hex_block,
to_block: to_hex_block
}
|> ProperCase.to_camel_case()
|> build_filter(state)
end
defp build_filter(filter, %State{address: addr}) when length(addr) > 0, do: Map.put(filter, :address, addr)
defp build_filter(filter, _), do: filter
end
6 changes: 4 additions & 2 deletions lib/slurp/logs/log_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule Slurp.Logs.LogSubscription do
hashed_event_signature: Slurp.Adapter.hashed_event_signature(),
struct: module,
handler: {module, atom, list},
abi: [map]
abi: [map],
address: String.t()
}

defstruct ~w[
Expand All @@ -19,11 +20,12 @@ defmodule Slurp.Logs.LogSubscription do
struct
handler
abi
address
]a

defimpl Stored.Item do
def key(log_subscription) do
{log_subscription.blockchain_id, log_subscription.event_signature}
{log_subscription.blockchain_id, log_subscription.event_signature, log_subscription.address}
end
end
end
29 changes: 23 additions & 6 deletions lib/slurp/logs/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,26 @@ defmodule Slurp.Logs.Subscriptions do
|> Juice.squeeze(blockchains_query)
|> Enum.flat_map(fn {blockchain_id, blockchain} ->
events
|> Enum.flat_map(fn {event_signature, handlers} ->
|> Enum.flat_map(fn {event_header, handlers} ->
handlers
|> Enum.map(fn handler ->
|> Enum.flat_map(fn handler ->

{event_signature, addresses} = parse_event_signature(event_header)
hashed_event_signature =
Slurp.Adapter.hash_event_signature(blockchain, event_signature)

attrs =
Map.merge(handler, %{
blockchain_id: blockchain_id,
event_signature: event_signature,
hashed_event_signature: hashed_event_signature
hashed_event_signature: hashed_event_signature,
address: addresses
})

struct!(Logs.LogSubscription, attrs)
make_log_subscription(attrs)
end)
end)
end)
end)

{:ok, log_subscriptions}
end

Expand Down Expand Up @@ -77,4 +78,20 @@ defmodule Slurp.Logs.Subscriptions do
def put(log_subscription, store_id \\ Logs.LogSubscriptionStore.default_store_id()) do
Logs.LogSubscriptionStore.put(log_subscription, store_id)
end

defp parse_event_signature(signature) when is_bitstring(signature), do: parse_event_signature({signature, []})
defp parse_event_signature({event_header, address}), do: {event_header, address}

defp make_log_subscription(%{address: []} = attrs) do
make_log_subscription(%{attrs | address: nil})
end
defp make_log_subscription(%{address: [address | []]} = attrs) do
make_log_subscription(%{attrs | address: address})
end
defp make_log_subscription(%{address: [address | rest]} = attrs) do
make_log_subscription(%{attrs | address: address}) ++ make_log_subscription(%{attrs | address: rest})
end
defp make_log_subscription(attrs) do
[struct!(Logs.LogSubscription, attrs)]
end
end
4 changes: 2 additions & 2 deletions mix.exs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Slurp.MixProject do
{:telemetry_metrics, "~> 0.4"},
{:telemetry_poller, "~> 0.4"},
{:telemetry_metrics_prometheus, "~> 1.0"},
{:logger_file_backend, "~> 0.0.10", only: [:dev, :test]},
{:logger_file_backend, "~> 0.0.10", only: [:dev, :test, :testnet]},
{:dialyxir, "~> 1.0", only: :dev, runtime: false},
{:mix_test_watch, "~> 1.0", only: :dev, runtime: false},
{:ex_unit_notifier, "~> 1.0", only: :test}
Expand All @@ -64,6 +64,6 @@ defmodule Slurp.MixProject do
}
end

defp elixirc_paths(env) when env == :test or env == :dev, do: ["lib", "examples"]
defp elixirc_paths(env) when env == :test or env == :dev or env == :testnet, do: ["lib", "examples"]
defp elixirc_paths(_), do: ["lib"]
end

0 comments on commit a68107d

Please sign in to comment.