Skip to content

Commit

Permalink
Merge pull request #122 from covalenthq/develop
Browse files Browse the repository at this point in the history
Rudder-DTM-RC:v0.2.8
  • Loading branch information
noslav authored May 3, 2023
2 parents 134da80 + 5540091 commit 1a86045
Show file tree
Hide file tree
Showing 26 changed files with 419 additions and 186 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,9 @@
- [Bugs Reporting & Contributions](#bugs-reporting-contributions)
- [Scripts](#scripts)


## <span id="rudder_intro">Introduction</span>

The Refiner is a block specimen data processing and transformation framework (Rudder), the purpose of which is validated data transformation.
The Refiner is a block specimen data processing and transformation framework (Rudder), the purpose of which is validated data transformation.

Generally, the Refiner has the capability to perform arbitrary transformations over any binary block specimen file, concurrently with other transformations. This enables simultaneous data indexing, with any consumer of the data slicing and dicing the data as they see fit. Such concurrent execution of ethereum blocks (via block specimens), makes it possible to trace, enrich or analyze blockchain data at an unprecedented rate with no sequential bottlenecks (provided each block specimen is its own independent entity and available at a decentralized content address!).

Expand All @@ -76,7 +75,7 @@ At a very high level, the Refiner locates a source to apply a transformational r

## <span id="rudder_arch">Architecture</span>

![Rudder Pipeline](./docs/pipeline.png)
![Rudder Pipeline](./docs/pipeline.jpg)

The happy path for `rudder` (the refiner) application in the Covalent Network is made up of actor processes spawned through many [Gen Servers](https://elixir-lang.org/getting-started/mix-otp/genserver.html) processes that are loosely coupled, here some maintain state and some don't. The children processes can be called upon to fulfill responsibilities at different sections in the refinement/transformation process pipeline - under one umbrella [Dynamic Supervisor](https://elixir-lang.org/getting-started/mix-otp/dynamic-supervisor.html), that can bring them back up in case of a failure to continue a given pipeline operation. Read more about the components and their operations in the [full architecture document](./docs/ARCH.md).

Expand Down
29 changes: 20 additions & 9 deletions docker-compose-mbase.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
version: '3'
# runs the entire rudder pipeline with all supporting services (including rudder) in docker
# set .env such that all services in docker are talking to each other only
services:
ipfs-pinner:
image: "us-docker.pkg.dev/covalent-project/network/ipfs-pinner:stable"
volumes:
- ~/.ipfs:/root/.ipfs/
container_name: ipfs-pinner
restart: on-failure
restart: always
labels:
"autoheal": "true"
expose:
- "4001:4001"
- "3000:3000"
Expand All @@ -22,7 +21,9 @@ services:
evm-server:
image: "us-docker.pkg.dev/covalent-project/network/evm-server:stable"
container_name: evm-server
restart: on-failure
restart: always
labels:
"autoheal": "true"
expose:
- "3002:3002"
networks:
Expand All @@ -35,15 +36,16 @@ services:
container_name: rudder
links:
- "ipfs-pinner:ipfs-pinner"
- "evm-server:evm-server"
# build:
# context: .
# dockerfile: Dockerfile
restart: on-failure
restart: always
depends_on:
ipfs-pinner:
condition: service_started
condition: service_healthy
evm-server:
condition: service_started
condition: service_healthy
entrypoint: >
/bin/bash -l -c "
echo "moonbase-node:" $NODE_ETHEREUM_MAINNET;
Expand All @@ -61,5 +63,14 @@ services:
networks:
- cqt-net

autoheal:
image: willfarrell/autoheal
container_name: autoheal
volumes:
- '/var/run/docker.sock:/var/run/docker.sock'
environment:
- AUTOHEAL_INTERVAL=10
- CURL_TIMEOUT=30

networks:
cqt-net:
cqt-net:
24 changes: 19 additions & 5 deletions docker-compose-mbeam.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ services:
volumes:
- ~/.ipfs:/root/.ipfs/
container_name: ipfs-pinner
restart: on-failure
restart: always
labels:
"autoheal": "true"
expose:
- "4001:4001"
- "3000:3000"
Expand All @@ -22,7 +24,9 @@ services:
evm-server:
image: "us-docker.pkg.dev/covalent-project/network/evm-server:stable"
container_name: evm-server
restart: on-failure
restart: always
labels:
"autoheal": "true"
expose:
- "3002:3002"
networks:
Expand All @@ -35,15 +39,16 @@ services:
container_name: rudder
links:
- "ipfs-pinner:ipfs-pinner"
- "evm-server:evm-server"
# build:
# context: .
# dockerfile: Dockerfile
restart: on-failure
restart: always
depends_on:
ipfs-pinner:
condition: service_started
condition: service_healthy
evm-server:
condition: service_started
condition: service_healthy
entrypoint: >
/bin/bash -l -c "
echo "moonbase-node:" $NODE_ETHEREUM_MAINNET;
Expand All @@ -61,5 +66,14 @@ services:
networks:
- cqt-net

autoheal:
image: willfarrell/autoheal
container_name: autoheal
volumes:
- '/var/run/docker.sock:/var/run/docker.sock'
environment:
- AUTOHEAL_INTERVAL=10
- CURL_TIMEOUT=30

networks:
cqt-net:
2 changes: 1 addition & 1 deletion docs/ARCH.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
- [Pipeline Journal](#pipeline-journal)
- [Pipeline Telemetry](#pipeline-telemetry)

![Rudder Pipeline](./pipeline.png)
![Rudder Pipeline](./pipeline.jpg)

The happy path for `rudder` (the refiner) application in the Covalent Network is made up of actor processes spawned through many [Gen Servers](https://elixir-lang.org/getting-started/mix-otp/genserver.html) processes that are loosely coupled, here some maintain state and some don't.

Expand Down
Binary file removed docs/arch.png
Binary file not shown.
Binary file removed docs/covalent.jpg
Binary file not shown.
Binary file added docs/pipeline.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed docs/pipeline.png
Binary file not shown.
Binary file removed docs/refiner.png
Binary file not shown.
Binary file removed docs/roadmap.png
Binary file not shown.
7 changes: 6 additions & 1 deletion lib/rudder/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ defmodule Rudder.Application do

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
options = [strategy: :one_for_one, name: Rudder.Supervisor]
options = [
strategy: :one_for_one,
name: Rudder.Supervisor,
max_restarts: 3,
max_seconds: 1200
]

Supervisor.start_link(children, options)
end
Expand Down
6 changes: 5 additions & 1 deletion lib/rudder/evm/block_processor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ defmodule Rudder.BlockProcessor do
{:reply, {:error, errormsg}, state}
end

{:error, %Mint.TransportError{reason: reason}} when reason in [:econnrefused, :nxdomain] ->
raise "#{inspect(reason)}: is evm-server up?"

{:error, errormsg} ->
Logger.error("error in blockprocessor: #{inspect(errormsg)}")
{:reply, {:error, errormsg}, state}
end
end
Expand Down Expand Up @@ -74,6 +78,6 @@ defmodule Rudder.BlockProcessor do

@impl true
def terminate(reason, _state) do
Logger.info("terminating blockprocessor: #{reason}")
Logger.info("terminating blockprocessor: #{inspect(reason)}")
end
end
54 changes: 36 additions & 18 deletions lib/rudder/ipfs/ipfs_interactor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,27 @@ defmodule Rudder.IPFSInteractor do
content_type = Multipart.content_type(multipart, "multipart/form-data")
headers = [{"Content-Type", content_type}, {"Content-Length", to_string(content_length)}]

{:ok, %Finch.Response{body: body, headers: _, status: _}} =
resp =
Finch.build("POST", url, headers, {:stream, body_stream})
|> Finch.request(Rudder.Finch)

body_map = body |> Poison.decode!()
case resp do
{:ok, %Finch.Response{body: body, headers: _, status: _}} ->
body_map = body |> Poison.decode!()

end_pin_ms = System.monotonic_time(:millisecond)
Events.ipfs_pin(end_pin_ms - start_pin_ms)
end_pin_ms = System.monotonic_time(:millisecond)
Events.ipfs_pin(end_pin_ms - start_pin_ms)

case body_map do
%{"error" => error} -> {:reply, {:error, error}, state}
%{"cid" => cid} -> {:reply, {:ok, cid}, state}
case body_map do
%{"error" => error} -> {:reply, {:error, error}, state}
%{"cid" => cid} -> {:reply, {:ok, cid}, state}
end

{:error, %Mint.TransportError{reason: :econnrefused}} ->
raise "connection refused: is ipfs-pinner started?"

{:error, err} ->
{:reply, {:error, err}, state}
end
end

Expand All @@ -56,22 +65,31 @@ defmodule Rudder.IPFSInteractor do

ipfs_url = Application.get_env(:rudder, :ipfs_pinner_url)

{:ok, %Finch.Response{body: body, headers: _, status: _}} =
resp =
Finch.build(:get, "#{ipfs_url}/get?cid=#{cid}")
|> Finch.request(Rudder.Finch, receive_timeout: 150_000_000, pool_timeout: 150_000_000)

end_fetch_ms = System.monotonic_time(:millisecond)
Events.ipfs_fetch(end_fetch_ms - start_fetch_ms)
case resp do
{:ok, %Finch.Response{body: body, headers: _, status: _}} ->
end_fetch_ms = System.monotonic_time(:millisecond)
Events.ipfs_fetch(end_fetch_ms - start_fetch_ms)

try do
body_map = body |> Poison.decode!()

case body_map do
%{"error" => error} -> {:reply, {:error, error}, state}
_ -> {:reply, {:ok, body}, state}
end
rescue
_ -> {:reply, {:ok, body}, state}
end

try do
body_map = body |> Poison.decode!()
{:error, %Mint.TransportError{reason: reason}} when reason in [:econnrefused, :nxdomain] ->
raise "#{inspect(reason)}: is ipfs-pinner up?"

case body_map do
%{"error" => error} -> {:reply, {:error, error}, state}
_ -> {:reply, {:ok, body}, state}
end
rescue
_ -> {:reply, {:ok, body}, state}
{:error, err} ->
{:reply, {:error, err}, state}
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/rudder/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Rudder.Pipeline do
start_pipeline_ms = System.monotonic_time(:millisecond)

try do
with [_chain_id, block_height, _block_hash, specimen_hash] <- String.split(bsp_key, "_"),
with [_chain_id, _block_height, _block_hash, specimen_hash] <- String.split(bsp_key, "_"),
{:ok, specimen} <- Rudder.IPFSInteractor.discover_block_specimen(urls),
{:ok, decoded_specimen} <- Rudder.Avro.BlockSpecimen.decode(specimen),
{:ok, block_specimen} <- extract_block_specimen(decoded_specimen),
Expand Down
8 changes: 5 additions & 3 deletions lib/rudder/proof_chain/block_specimen_event_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ defmodule Rudder.ProofChain.BlockSpecimenEventListener do
Rudder.Network.EthereumMainnet.eth_getLogs([
%{
address: proofchain_address,
fromBlock: block_height,
toBlock: block_height,
fromBlock: "0x" <> Integer.to_string(block_height, 16),
toBlock: "0x" <> Integer.to_string(block_height, 16),
topics: [@bsp_awarded_event_hash]
}
])
Expand All @@ -123,12 +123,14 @@ defmodule Rudder.ProofChain.BlockSpecimenEventListener do

defp loop(curr_block_height) do
{:ok, latest_block_number} = Rudder.Network.EthereumMainnet.eth_blockNumber()
Logger.info("curr_block: #{curr_block_height} and latest_block_num:#{latest_block_number}")

if curr_block_height > latest_block_number do
Logger.info("synced to latest; waiting for #{curr_block_height} to be mined")
# ~12 seconds is mining time of one moonbeam block
:timer.sleep(12_000)
loop(curr_block_height)
else
Logger.info("curr_block: #{curr_block_height} and latest_block_num:#{latest_block_number}")
end
end
end
Loading

0 comments on commit 1a86045

Please sign in to comment.