-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
330 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
#!/usr/bin/env python3 | ||
import json | ||
import sys | ||
import urllib.parse | ||
import requests | ||
from typing import Optional | ||
import cryptography.hazmat.primitives.asymmetric.utils as crypto | ||
from cryptography.hazmat.primitives.asymmetric import ec | ||
from cryptography.hazmat.primitives import hashes | ||
import hashlib | ||
|
||
ENDPOINT = "https://themis.mainnet.desmos.network/instagram" | ||
HEADERS = {"Content-Type": "application/json"} | ||
|
||
|
||
class CallData: | ||
""" | ||
Contains the data that has been used to call the script | ||
""" | ||
|
||
def __init__(self, username: str): | ||
self.username = username | ||
|
||
|
||
class VerificationData: | ||
""" | ||
Contains the data needed to verify the proof submitted by the user. | ||
""" | ||
|
||
def __init__(self, address: str, pub_key: str, value: str, signature: str): | ||
self.address = address | ||
self.pub_key = pub_key | ||
self.signature = signature | ||
self.value = value | ||
|
||
|
||
def get_user_data(data: CallData) -> Optional[VerificationData]: | ||
""" | ||
Tries getting the verification data for the user having the given Instagram username. | ||
:param data: Data used to get the VerificationData | ||
:return: An OptionalData object if the call was successful, or None if it errored somehow. | ||
""" | ||
try: | ||
url_encoded_username = urllib.parse.quote(data.username) | ||
result = requests.request("GET", f"{ENDPOINT}/{url_encoded_username}", headers=HEADERS).json() | ||
if validate_json(result): | ||
return VerificationData( | ||
result['address'], | ||
result['pub_key'], | ||
result['value'], | ||
result['signature'], | ||
) | ||
else: | ||
return None | ||
except ValueError: | ||
return None | ||
|
||
|
||
def validate_json(json: dict) -> bool: | ||
""" | ||
Tells whether or not the given JSON is a valid signature JSON object. | ||
:param json: JSON object to be checked. | ||
:return: True if the provided JSON has a valid signature schema, or False otherwise. | ||
""" | ||
return all(key in json for key in ['value', 'pub_key', 'signature', 'address']) | ||
|
||
|
||
def verify_signature(data: VerificationData) -> bool: | ||
""" | ||
Verifies the signature using the given pubkey and value. | ||
:param data: Data used to verify the signature. | ||
:return True if the signature is valid, False otherwise | ||
""" | ||
if len(data.signature) != 128: | ||
return False | ||
|
||
try: | ||
# Create signature for dss signature | ||
(r, s) = int(data.signature[:64], 16), int(data.signature[64:], 16) | ||
sig = crypto.encode_dss_signature(r, s) | ||
|
||
# Create public key instance | ||
public_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256K1(), bytes.fromhex(data.pub_key)) | ||
|
||
# Verify the signature | ||
public_key.verify(sig, bytes.fromhex(data.value), ec.ECDSA(hashes.SHA256())) | ||
return True | ||
except Exception: | ||
return False | ||
|
||
|
||
def verify_address(data: VerificationData) -> bool: | ||
""" | ||
Verifies that the given address is the one associated with the provided HEX encoded compact public key. | ||
:param data: Data used to verify the address | ||
""" | ||
s = hashlib.new("sha256", bytes.fromhex(data.pub_key)).digest() | ||
r = hashlib.new("ripemd160", s).digest() | ||
return data.address.upper() == r.hex().upper() | ||
|
||
|
||
def check_values(values: dict) -> CallData: | ||
""" | ||
Checks the validity of the given dictionary making sure it contains the proper data. | ||
:param values: Dictionary that should be checked. | ||
:return: A CallData instance. | ||
""" | ||
if "username" not in values: | ||
raise Exception("Missing 'username' value") | ||
|
||
return CallData(values["username"]) | ||
|
||
|
||
def main(args: str): | ||
""" | ||
Gets the signature data from Instagram, after the user has provided it through the Hephaestus bot. | ||
:param args Hex encoded JSON object containing the arguments to be used during the execution. | ||
In order to be valid, the encoded JSON object must contain one field named "username" that represents the Instagram | ||
username of the account to be connected. | ||
Example argument value: | ||
7B22757365726E616D65223A22526963636172646F204D6F6E7461676E696E2335343134227D | ||
This is the hex encoded representation of the following JSON object: | ||
```json | ||
{ | ||
"username":"test_user" | ||
} | ||
``` | ||
:param args: JSON encoded parameters used during the execution. | ||
:return The signed value and the signature as a single comma separated string. | ||
:raise Exception if anything is wrong during the process. This can happen if: | ||
1. The Instagram user has not started the connection using the Hephaestus bot | ||
2. The provided signature is not valid | ||
3. The provided address is not linked to the provided public key | ||
""" | ||
|
||
decoded = bytes.fromhex(args) | ||
json_obj = json.loads(decoded) | ||
call_data = check_values(json_obj) | ||
|
||
result = get_user_data(call_data) | ||
if result is None: | ||
raise Exception(f"No valid signature data found for user with username {call_data.username}") | ||
|
||
# Verify the signature | ||
signature_valid = verify_signature(result) | ||
if not signature_valid: | ||
raise Exception("Invalid signature") | ||
|
||
# Verify the address | ||
address_valid = verify_address(result) | ||
if not address_valid: | ||
raise Exception("Invalid address") | ||
|
||
return f"{result.value},{result.signature},{call_data.username}" | ||
|
||
|
||
if __name__ == "__main__": | ||
try: | ||
print(main(*sys.argv[1:])) | ||
except Exception as e: | ||
print(str(e), file=sys.stderr) | ||
sys.exit(1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
import unittest | ||
import instagram | ||
import httpretty | ||
|
||
|
||
class instagramTest(unittest.TestCase): | ||
|
||
@httpretty.activate(verbose=True, allow_net_connect=False) | ||
def test_get_user_data(self): | ||
# Register fake HTTP call | ||
httpretty.register_uri( | ||
httpretty.GET, | ||
"https://themis.mainnet.desmos.network/instagram/test_user_desmos", | ||
status=200, | ||
body='{"address":"71b0310267b49279116835ed35791c24c110012f","pub_key":"0203233fabd69a1b7a90bb968a0ab66e3af61989f65cf0bc1f8e9518740a302f1f","value":"746573745f757365725f6465736d6f73","signature":"c12605456b8652df655bb43d0166586dfc0c5d758b03f127ca6b027d0ec140ca29b9569a20c9b78b72e13d15c1a7fa0b142dc0e624f3f51ef76bd94e55345d2a"}', | ||
) | ||
|
||
# Valid signature | ||
data = instagram.get_user_data(instagram.CallData('test_user_desmos')) | ||
self.assertIsNotNone(data) | ||
|
||
# Invalid signature | ||
httpretty.register_uri( | ||
httpretty.GET, | ||
"https://themis.mainnet.desmos.network/instagram/test_user_desmos", | ||
status=404, | ||
) | ||
data = instagram.get_user_data(instagram.CallData('test_user_desmos')) | ||
self.assertIsNone(data) | ||
|
||
def test_validate_json(self): | ||
jsons = [ | ||
{ | ||
'name': 'Valid JSON', | ||
'json': { | ||
'address': '8902A4822B87C1ADED60AE947044E614BD4CAEE2', | ||
'pub_key': '033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'signature': 'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
'value': 'ricmontagnin' | ||
}, | ||
'valid': True | ||
}, | ||
{ | ||
'name': 'Missing address', | ||
'json': { | ||
'pub_key': '033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'signature': 'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
'value': 'ricmontagnin' | ||
}, | ||
'valid': False | ||
}, | ||
{ | ||
'name': 'Missing pub_key', | ||
'json': { | ||
'address': '8902A4822B87C1ADED60AE947044E614BD4CAEE2', | ||
'signature': 'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
'value': 'ricmontagnin' | ||
}, | ||
'valid': False | ||
}, | ||
{ | ||
'name': 'Missing signature', | ||
'json': { | ||
'address': '8902A4822B87C1ADED60AE947044E614BD4CAEE2', | ||
'pub_key': '033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'value': 'ricmontagnin' | ||
}, | ||
'valid': False | ||
}, | ||
{ | ||
'name': 'Missing value', | ||
'json': { | ||
'address': '8902A4822B87C1ADED60AE947044E614BD4CAEE2', | ||
'pub_key': '033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'signature': 'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
}, | ||
'valid': False | ||
}, | ||
] | ||
|
||
for json in jsons: | ||
result = instagram.validate_json(json['json']) | ||
self.assertEqual(json['valid'], result, json['name']) | ||
|
||
def test_verify_signature(self): | ||
tests = [ | ||
{ | ||
'name': 'Valid data', | ||
'valid': True, | ||
'data': instagram.VerificationData( | ||
'', | ||
'033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'7269636d6f6e7461676e696e', | ||
'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
), | ||
}, | ||
{ | ||
'name': 'Invalid value', | ||
'valid': False, | ||
'data': instagram.VerificationData( | ||
'', | ||
'033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
'ricmontagni', | ||
), | ||
}, | ||
{ | ||
'name': 'Invalid signature', | ||
'valid': False, | ||
'data': instagram.VerificationData( | ||
'', | ||
'033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'a00a7d5bd45e42615645fcaeb4d800af2704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
'7269636d6f6e7461676e696e', | ||
), | ||
}, | ||
{ | ||
'name': 'Invalid pub key', | ||
'valid': False, | ||
'data': instagram.VerificationData( | ||
'', | ||
'033024e9e0ad4f9305ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173', | ||
'7269636d6f6e7461676e696e', | ||
), | ||
}, | ||
] | ||
|
||
for test in tests: | ||
result = instagram.verify_signature(test['data']) | ||
self.assertEqual(test['valid'], result, test['name']) | ||
|
||
def test_verify_address(self): | ||
tests = [ | ||
{ | ||
'name': 'Valid address', | ||
'valid': True, | ||
'data': instagram.VerificationData( | ||
'8902A4822B87C1ADED60AE947044E614BD4CAEE2', | ||
'033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b417d', | ||
'7269636d6f6e7461676e696e', | ||
'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173' | ||
), | ||
}, | ||
{ | ||
'name': 'Invalid address', | ||
'valid': False, | ||
'data': instagram.VerificationData( | ||
'8902A4822B87C1ADED60AE947044E614BD4CAEE2', | ||
'033024e9e0ad4f93045ef5a60bb92171e6418cd13b082e7a7bc3ed05312a0b41', | ||
'7269636d6f6e7461676e696e', | ||
'a00a7d5bd45e42615645fcaeb4d800af22704e54937ab235e5e50bebd38e88b765fdb696c22712c0cab1176756b6346cbc11481c544d1f7828cb233620c06173' | ||
), | ||
}, | ||
] | ||
|
||
for test in tests: | ||
result = instagram.verify_address(test['data']) | ||
self.assertEqual(test['valid'], result, test['name']) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |