Skip to content

Commit

Permalink
Use sync or async exchange calls in ethereum client
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeutin-ledger committed Mar 28, 2024
1 parent a84d4f5 commit 9d4d306
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 122 deletions.
109 changes: 69 additions & 40 deletions client/src/ledger_app_clients/ethereum/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import rlp
from contextlib import contextmanager
from enum import IntEnum
from ragger.backend import BackendInterface
from ragger.utils import RAPDU
Expand Down Expand Up @@ -40,61 +41,86 @@ def __init__(self, client: BackendInterface):
self._client = client
self._cmd_builder = CommandBuilder()

def _send(self, payload: bytes):
return self._client.exchange_async_raw(payload)
@contextmanager
def _exchange_async(self, payload: bytes):
with self._client.exchange_async_raw(payload) as response:
yield response

def _exchange(self, payload: bytes):
return self._client.exchange_raw(payload)

def response(self) -> Optional[RAPDU]:
return self._client.last_async_response

@contextmanager
def eip712_send_struct_def_struct_name(self, name: str):
return self._send(self._cmd_builder.eip712_send_struct_def_struct_name(name))
with self._exchange_async(self._cmd_builder.eip712_send_struct_def_struct_name(name)) as response:
yield response

@contextmanager
def eip712_send_struct_def_struct_field(self,
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: list,
key_name: str):
return self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
with self._exchange_async(self._cmd_builder.eip712_send_struct_def_struct_field(
field_type,
type_name,
type_size,
array_levels,
key_name))
key_name)) as response:
yield response

@contextmanager
def eip712_send_struct_impl_root_struct(self, name: str):
return self._send(self._cmd_builder.eip712_send_struct_impl_root_struct(name))
with self._exchange_async(self._cmd_builder.eip712_send_struct_impl_root_struct(name)) as response:
yield response

@contextmanager
def eip712_send_struct_impl_array(self, size: int):
return self._send(self._cmd_builder.eip712_send_struct_impl_array(size))
with self._exchange_async(self._cmd_builder.eip712_send_struct_impl_array(size)) as response:
yield response

@contextmanager
def eip712_send_struct_impl_struct_field(self, raw_value: bytes):
chunks = self._cmd_builder.eip712_send_struct_impl_struct_field(bytearray(raw_value))
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
with self._exchange_async(chunks[-1]) as response:
yield response

@contextmanager
def eip712_sign_new(self, bip32_path: str):
return self._send(self._cmd_builder.eip712_sign_new(bip32_path))
with self._exchange_async(self._cmd_builder.eip712_sign_new(bip32_path)) as response:
yield response

@contextmanager
def eip712_sign_legacy(self,
bip32_path: str,
domain_hash: bytes,
message_hash: bytes):
return self._send(self._cmd_builder.eip712_sign_legacy(bip32_path,
domain_hash,
message_hash))
with self._exchange_async(self._cmd_builder.eip712_sign_legacy(bip32_path,
domain_hash,
message_hash)) as response:
yield response

@contextmanager
def eip712_filtering_activate(self):
return self._send(self._cmd_builder.eip712_filtering_activate())
with self._exchange_async(self._cmd_builder.eip712_filtering_activate()) as response:
yield response

@contextmanager
def eip712_filtering_message_info(self, name: str, filters_count: int, sig: bytes):
return self._send(self._cmd_builder.eip712_filtering_message_info(name, filters_count, sig))
with self._exchange_async(self._cmd_builder.eip712_filtering_message_info(name, filters_count, sig)) as response:
yield response

@contextmanager
def eip712_filtering_show_field(self, name: str, sig: bytes):
return self._send(self._cmd_builder.eip712_filtering_show_field(name, sig))
with self._exchange_async(self._cmd_builder.eip712_filtering_show_field(name, sig)) as response:
yield response

@contextmanager
def sign(self,
bip32_path: str,
tx_params: dict):
Expand All @@ -111,24 +137,26 @@ def sign(self,
tx = prefix + rlp.encode(decoded + suffix)
chunks = self._cmd_builder.sign(bip32_path, tx, suffix)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
with self._exchange_async(chunks[-1]) as response:
yield response

def get_challenge(self):
return self._send(self._cmd_builder.get_challenge())
return self._exchange(self._cmd_builder.get_challenge())

@contextmanager
def get_public_addr(self,
display: bool = True,
chaincode: bool = False,
bip32_path: str = "m/44'/60'/0'/0/0",
chain_id: Optional[int] = None):
return self._send(self._cmd_builder.get_public_addr(display,
chaincode,
bip32_path,
chain_id))
chain_id: Optional[int] = None) -> RAPDU:
with self._exchange_async(self._cmd_builder.get_public_addr(display,
chaincode,
bip32_path,
chain_id)) as response:
yield response

def provide_domain_name(self, challenge: int, name: str, addr: bytes):
def provide_domain_name(self, challenge: int, name: str, addr: bytes) -> RAPDU:
payload = format_tlv(DomainNameTag.STRUCTURE_TYPE, 3) # TrustedDomainName
payload += format_tlv(DomainNameTag.STRUCTURE_VERSION, 1)
payload += format_tlv(DomainNameTag.SIGNER_KEY_ID, 0) # test key
Expand All @@ -142,9 +170,9 @@ def provide_domain_name(self, challenge: int, name: str, addr: bytes):

chunks = self._cmd_builder.provide_domain_name(payload)
for chunk in chunks[:-1]:
with self._send(chunk):
with self._exchange(chunk):
pass
return self._send(chunks[-1])
return self._exchange(chunks[-1])

def set_plugin(self,
plugin_name: str,
Expand All @@ -155,7 +183,7 @@ def set_plugin(self,
version: int = 1,
key_id: int = 2,
algo_id: int = 1,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
Expand All @@ -170,7 +198,7 @@ def set_plugin(self,
bytes())
# skip APDU header & empty sig
sig = sign_data(Key.SET_PLUGIN, tmp[5:-1])
return self._send(self._cmd_builder.set_plugin(type_,
return self._exchange(self._cmd_builder.set_plugin(type_,
version,
plugin_name,
contract_addr,
Expand All @@ -188,7 +216,7 @@ def provide_nft_metadata(self,
version: int = 1,
key_id: int = 1,
algo_id: int = 1,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
Expand All @@ -202,7 +230,7 @@ def provide_nft_metadata(self,
bytes())
# skip APDU header & empty sig
sig = sign_data(Key.NFT, tmp[5:-1])
return self._send(self._cmd_builder.provide_nft_information(type_,
return self._exchange(self._cmd_builder.provide_nft_information(type_,
version,
collection,
addr,
Expand All @@ -215,29 +243,30 @@ def set_external_plugin(self,
plugin_name: str,
contract_address: bytes,
method_selelector: bytes,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
tmp = self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, bytes())

# skip APDU header & empty sig
sig = sign_data(Key.CAL, tmp[5:])
return self._send(self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, sig))
return self._exchange(self._cmd_builder.set_external_plugin(plugin_name, contract_address, method_selelector, sig))

@contextmanager
def personal_sign(self, path: str, msg: bytes):
chunks = self._cmd_builder.personal_sign(path, msg)
for chunk in chunks[:-1]:
with self._send(chunk):
pass
return self._send(chunks[-1])
self._exchange(chunk)
with self._exchange_async(chunks[-1]) as response:
yield response

def provide_token_metadata(self,
ticker: str,
addr: bytes,
decimals: int,
chain_id: int,
sig: Optional[bytes] = None):
sig: Optional[bytes] = None) -> RAPDU:
if sig is None:
# Temporarily get a command with an empty signature to extract the payload and
# compute the signature on it
Expand All @@ -248,7 +277,7 @@ def provide_token_metadata(self,
bytes())
# skip APDU header & empty sig
sig = sign_data(Key.CAL, tmp[6:])
return self._send(self._cmd_builder.provide_erc20_token_information(ticker,
return self._exchange(self._cmd_builder.provide_erc20_token_information(ticker,
addr,
decimals,
chain_id,
Expand Down
10 changes: 4 additions & 6 deletions tests/ragger/test_blind_sign.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
import pytest
from ragger.backend import BackendInterface
from ragger.firmware import Firmware
from ragger.navigator import Navigator, NavInsID
from ragger.error import ExceptionRAPDU
from ledger_app_clients.ethereum.client import EthAppClient
from ledger_app_clients.ethereum.client import EthAppClient, StatusWord
from web3 import Web3
from constants import ROOT_SNAPSHOT_PATH, ABIS_FOLDER

Expand Down Expand Up @@ -35,13 +36,10 @@ def test_blind_sign(firmware: Firmware,
"data": data,
"chainId": 1
}
try:
with pytest.raises(ExceptionRAPDU) as e:
with app_client.sign("m/44'/60'/0'/0/0", tx_params):
pass
except ExceptionRAPDU:
pass
else:
assert False
assert e.value.status == StatusWord.INVALID_DATA

moves = list()
if firmware.device.startswith("nano"):
Expand Down
Loading

0 comments on commit 9d4d306

Please sign in to comment.