Skip to content

Commit

Permalink
Added 'new_keypair' to PysuiConfiguration'
Browse files Browse the repository at this point in the history
  • Loading branch information
FrankC01 committed Jul 15, 2024
1 parent ed81ccc commit e893f2e
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 85 deletions.
74 changes: 67 additions & 7 deletions pysui/sui/sui_pgql/config/confgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

"""Sui Configuration Group."""

import base64
import hashlib
import dataclasses
from typing import Optional, Union
import dataclasses_json
from pysui.sui.sui_crypto import keypair_from_keystring, SuiKeyPair
import pysui.sui.sui_crypto as crypto


@dataclasses.dataclass
Expand Down Expand Up @@ -64,6 +66,13 @@ def _address_exists(self, *, address: str) -> Union[str, bool]:
"""Check if address is valid."""
return next(filter(lambda addy: addy == address, self.address_list), False)

def _key_exists(self, *, key_string: str) -> Union[ProfileKey, bool]:
"""Check if key string exists."""
return next(
filter(lambda pkey: pkey.private_key_base64 == key_string, self.key_list),
False,
)

@property
def active_address(self) -> str:
"""Return the active address."""
Expand Down Expand Up @@ -93,30 +102,42 @@ def active_alias(self, change_to: str) -> str:
return _res.alias
raise ValueError(f"Alias {change_to} not found in group")

def address_for_alias(self, alias: str) -> str:
def address_for_alias(self, *, alias: str) -> str:
"""Get address associated with alias."""
_res = self._alias_exists(alias_name=alias)
if _res:
aliindx = self.alias_list.index(_res)
return self.address_list[aliindx]
raise ValueError(f"Alias {alias} not found in group")

def alias_for_address(self, address: str) -> ProfileAlias:
def alias_for_address(self, *, address: str) -> ProfileAlias:
"""Get alias associated with address."""
_res = self._address_exists(address=address)
if _res:
adindex = self.address_list.index(_res)
return self.alias_list[adindex]
raise ValueError(f"Address {address} not found in group")

def alias_name_for_address(self, address: str) -> str:
def alias_name_for_address(self, *, address: str) -> str:
"""Get alias associated with address."""
_res = self._address_exists(address=address)
if _res:
adindex = self.address_list.index(_res)
return self.alias_list[adindex].alias
raise ValueError(f"Address {address} not found in group")

def replace_alias_name(self, *, from_alias: str, to_alias: str) -> str:
"""Replace alias name and return associated address."""
_res = self._alias_exists(alias_name=from_alias)
if _res:
_rese = self._alias_exists(alias_name=to_alias)
if not _rese:
aliindx = self.alias_list.index(_res)
_res.alias = to_alias
return self.address_list[aliindx]
raise ValueError(f"Alias {to_alias} already exists")
raise ValueError(f"Alias {from_alias} not found in group")

@property
def active_profile(self) -> Profile:
"""Gets the active profile."""
Expand All @@ -135,14 +156,53 @@ def active_profile(self, change_to: str) -> Profile:
return _res
raise ValueError(f"{change_to} profile does not exist")

def address_keypair(self, address: str) -> SuiKeyPair:
"""."""
def address_keypair(self, *, address: str) -> crypto.SuiKeyPair:
"""Fetch an addresses KeyPair."""
_res = self._address_exists(address=address)
if _res:
return keypair_from_keystring(
return crypto.keypair_from_keystring(
self.key_list[self.address_list.index(_res)].private_key_base64
)

def add_keypair_and_parts(
self,
*,
new_alias: str,
new_keypair: crypto.SuiKeyPair,
make_active: Optional[bool] = False,
) -> str:
"""Add a new keypair with associated address and alias."""
_new_keystr = new_keypair.serialize()
if not self._key_exists(key_string=_new_keystr):
if self._alias_exists(alias_name=new_alias):
raise ValueError(
f"Alias {new_alias} already exist attempting new key and address."
)
# ProfileKey Entry
_prvk_entry = ProfileKey(_new_keystr)
# Generate artifacts
_pkey_bytes = new_keypair.to_bytes()
_digest = _pkey_bytes[0:33] if _pkey_bytes[0] == 0 else _pkey_bytes[0:34]
# ProfileAlias Entry
_alias_entry = ProfileAlias(
new_alias,
base64.b64encode(new_keypair.public_key.scheme_and_key()).decode(),
)
_new_addy = format(
f"0x{hashlib.blake2b(_digest, digest_size=32).hexdigest()}"
)
# Populate group
self.address_list.append(_new_addy)
self.key_list.append(_prvk_entry)
self.alias_list.append(_alias_entry)

if make_active:
self.using_address = _new_addy
return _new_addy
raise ValueError(
f"Private keystring {_new_keystr} already exists attempting new key and address.."
)

def add_profile(self, *, new_prf: Profile, make_active: bool = False):
"""Add profile to list after validating name"""
_res = self._profile_exists(profile_name=new_prf.profile_name)
Expand Down
2 changes: 1 addition & 1 deletion pysui/sui/sui_pgql/config/confmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def remove_group(self, *, group_name: str) -> None:
self.group_active = self.groups[0].group_name

def get_group(self, *, group_name: str) -> prfgrp.ProfileGroup:
"""Test for group existence."""
"""Get a group or throw exception if doesn't exist."""
_res = self._group_exists(group_name=group_name)
if not _res:
raise ValueError(f"{group_name} does not exist.")
Expand Down
163 changes: 155 additions & 8 deletions pysui/sui/sui_pgql/config/pysui_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
from pathlib import Path
from typing import Optional

import pysui.sui.sui_crypto as crypto
import pysui.sui.sui_utils as utils

from pysui.abstracts.client_keypair import SignatureScheme
from pysui.sui.sui_constants import SUI_MAX_ALIAS_LEN, SUI_MIN_ALIAS_LEN

from pysui.sui.sui_pgql.config.confgroup import ProfileGroup
from pysui.sui.sui_pgql.config.confmodel import PysuiConfigModel


Expand Down Expand Up @@ -53,6 +60,10 @@ def __init__(
_bcfg = Path("~/.cargo/bin/sui").expanduser()
# Initialize from sui config if found
self.refresh_legacy(replace=False)
# Set if not exist, don't overwrite
self.model.add_group(
group=ProfileGroup("user", "", "", [], [], [], []), make_active=False
)
if self._model.initialize_gql_rpc(
sui_binary=_bcfg,
gql_rpc_group_name=self.SUI_GQL_RPC_GROUP,
Expand Down Expand Up @@ -105,42 +116,72 @@ def model(self) -> PysuiConfigModel:
"""."""
return self._model

@property
def active_group(self) -> ProfileGroup:
"""Return the active group."""
return self._model.active_group

@property
def active_address(self) -> str:
"""Returns the active groups active address."""
return self._model.active_group.using_address
return self.active_group.using_address

@property
def active_address_alias(self) -> str:
"""Returns the active groups active address."""
return self._model.active_group.active_alias
return self.active_group.active_alias

@property
def active_env(self) -> str:
"""Returns the active groups active profile name."""
return self._model.active_group.using_profile
return self.active_group.using_profile

@property
def url(self) -> str:
"""Returns the active groups active profile url."""
return self._model.active_group.active_profile.url
return self.active_group.active_profile.url

@property
def faucet_url(self) -> str:
"""Returns the active groups active profile faucet url."""
return self._model.active_group.active_profile.faucet_urls
return self.active_group.active_profile.faucet_urls

@property
def faucet_status_url(self) -> str:
"""Returns the active groups active profile faucet status url."""
return self._model.active_group.active_profile.faucet_status_url
return self.active_group.active_profile.faucet_status_url

@property
def config_path(self) -> str:
"""Return configuration breadcrump path."""
_group = self._model.active_group
_group = self.active_group
return f"{_group.group_name}.{_group.active_profile.profile_name}"

def address_for_alias(self, *, alias_name: str) -> str:
"""Return the address for the alias name in current group."""
return self.active_group.address_for_alias(alias=alias_name)

def alias_for_address(self, *, address: str) -> str:
"""Return the alias for the address in current group."""
return self.active_group.alias_for_address(address=address)

def rename_alias(
self,
*,
existing_alias: str,
new_alias: str,
in_group: Optional[str] = None,
persist: Optional[bool] = False,
) -> str:
"""Rename an alias in a group, default to active_group."""
_group = self.active_group
if in_group and in_group != _group.group_name:
_group = self._model.get_group(group_name=in_group)

_res = _group.replace_alias_name(from_alias=existing_alias, to_alias=new_alias)
if _res and persist:
self._write_model()

def make_active(
self,
*,
Expand All @@ -163,7 +204,7 @@ def make_active(
else:
raise ValueError(f"{group_name} does not exist")
else:
_group = self._model.active_group
_group = self.active_group

# Change profile, checks if it exists
if profile_name and _group.using_profile != profile_name:
Expand All @@ -181,3 +222,109 @@ def make_active(

if _changes and persist:
self._write_model()

def _alias_check_or_gen(
self,
*,
aliases: Optional[list[str]] = None,
word_counts: Optional[int] = 12,
alias: Optional[str] = None,
current_iter: Optional[int] = 0,
) -> str:
"""_alias_check_or_gen If alias is provided, checks if unique otherwise creates one or more.
:param aliases: List of existing aliases, defaults to None
:type aliases: list[str], optional
:param word_counts: Words count used for mnemonic phrase, defaults to 12
:type word_counts: Optional[int], optional
:param alias: An inbound alias, defaults to None
:type alias: Optional[str], optional
:param current_iter: Internal recursion count, defaults to 0
:type current_iter: Optional[int], optional
:return: An aliases
:rtype: str
"""
if not alias:
parts = list(
utils.partition(
crypto.gen_mnemonic_phrase(word_counts).split(" "),
int(word_counts / 2),
)
)
alias_names = [k + "-" + v for k, v in zip(*parts)]

# alias_names = self._alias_gen_batch(word_counts=word_counts)
# Find a unique part if just_one
if not aliases:
alias = alias_names[0]
else:
for alias_name in alias_names:
if alias_name not in aliases:
# Found one
alias = alias_name
break
# If all match (unlikely), try unless threshold
if not alias:
if current_iter > 2:
raise ValueError("Unable to find unique alias")
else:
alias = self._alias_check_or_gen(
aliases=aliases,
word_counts=word_counts,
current_iter=current_iter + 1,
)
else:
if alias in aliases:
raise ValueError(f"Alias {alias} already exists.")
if not (SUI_MIN_ALIAS_LEN <= len(alias) <= SUI_MAX_ALIAS_LEN):
raise ValueError(
f"Invalid alias string length, must be betwee {SUI_MIN_ALIAS_LEN} and {SUI_MAX_ALIAS_LEN} characters."
)
return alias

def new_keypair(
self,
*,
of_keytype: SignatureScheme,
in_group: Optional[str] = None,
word_counts: Optional[int] = 12,
derivation_path: Optional[str] = None,
make_active: Optional[bool] = False,
alias: Optional[str] = None,
persist: Optional[bool] = True,
) -> tuple[str, str]:
"""Creates a new keypair returning generated passphrase and associated address."""
# Resolve group
_group = self.active_group
if in_group and in_group != _group.group_name:
_group = self._model.get_group(group_name=in_group)

match of_keytype:
case (
SignatureScheme.ED25519
| SignatureScheme.SECP256K1
| SignatureScheme.SECP256R1
):
# First, early check alias given or create new one
_aliases = [x.alias for x in _group.alias_list]
alias = self._alias_check_or_gen(
aliases=_aliases, alias=alias, word_counts=word_counts
)
# Generate the new key and address
mnem, keypair = crypto.create_new_keypair(
scheme=of_keytype,
word_counts=word_counts,
derv_path=derivation_path,
)
new_addy = _group.add_keypair_and_parts(
new_alias=alias, new_keypair=keypair, make_active=make_active
)
if make_active and _group.group_name != self.active_group.group_name:
self._model.active_group = _group.group_name
if persist:
self._write_model()
return mnem, new_addy
case _:
raise NotImplementedError(
f"{of_keytype}: Not recognized as valid keypair scheme."
)
2 changes: 1 addition & 1 deletion pysui/sui/sui_pgql/pgql_txb_signing.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def get_signatures(self, *, config: PysuiConfiguration, tx_bytes: str) -> list[s
if isinstance(signer, str):

sig_list.append(
config.model.active_group.address_keypair(signer).new_sign_secure(
config.active_group.address_keypair(address=signer).new_sign_secure(
tx_bytes
)
)
Expand Down
3 changes: 1 addition & 2 deletions pysui/sui/sui_pgql/pgql_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def check_owner(
inlen = len(owner)
assert isinstance(owner, str), "Owner should be str"
try:
return config.model.active_group.address_for_alias(owner)
# return config.addr4al(owner).address
return config.address_for_alias(alias_name=owner)
except ValueError:
pass
if inlen < 3 or inlen > SUI_HEX_ADDRESS_STRING_LEN:
Expand Down
Loading

0 comments on commit e893f2e

Please sign in to comment.