Skip to content

Commit

Permalink
fixes for discovery and tpicapi, test are passed
Browse files Browse the repository at this point in the history
  • Loading branch information
cleverfox committed Apr 8, 2023
1 parent df848be commit 555216f
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 76 deletions.
15 changes: 15 additions & 0 deletions apps/tpic2/src/tpic2_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,25 @@ connection_process(Parent, Host, Port, Opts) ->
{ok, Socket} = ssl:connect(TCPSocket, SSLOpts),
ssl:setopts(Socket, [{active, once}]),
{ok,PeerInfo}=ssl:connection_information(Socket),
PeerPK=case ssl:peercert(Socket) of
{ok, PC} ->
DCert=tpic2:extract_cert_info(public_key:pkix_decode_cert(PC,otp)),
case DCert of
#{pubkey:=Der} ->
Der;
_ ->
?LOG_NOTICE("Unknown cert ~p",[DCert]),
undefined
end;
{error, no_peercert} ->
undefined
end,

State=#{
ref=>maps:get(ref, Opts, undefined),
socket=>Socket,
peerinfo=>PeerInfo,
pubkey=>PeerPK,
timer=>undefined,
transport=>ranch_ssl,
parent=>Parent,
Expand Down
10 changes: 6 additions & 4 deletions apps/tpic2/src/tpic2_tls.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ conn_proto(Parent, Ref, Socket, Transport, Opts, <<"tpic2">>, PeerPK) ->
timer=>undefined,
transport=>Transport,
protocol => tpic2,
peerpk => PeerPK,
pubkey => PeerPK,
nodeid=> try
nodekey:get_pub()
catch _:_ -> atom_to_binary(node(),utf8)
Expand All @@ -96,7 +96,7 @@ conn_proto(_Parent, _Ref, Socket, Transport, _Opts, _, _) ->
Transport:close(Socket).

loop1(State=#{socket:=Socket,role:=Role,opts:=Opts,
transport:=Transport,peerpk:=Pubkey}) ->
transport:=Transport,pubkey:=Pubkey}) ->
%{ok,PC}=ssl:peercert(Socket),
%DCert=tpic2:extract_cert_info(public_key:pkix_decode_cert(PC,otp)),
%Pubkey=case DCert of
Expand Down Expand Up @@ -166,7 +166,7 @@ loop1(State=#{socket:=Socket,role:=Role,opts:=Opts,
if WhatToDo==shutdown ->
done;
true ->
?MODULE:loop(State#{pubkey=>Pubkey})
?MODULE:loop(State)
end
end.

Expand Down Expand Up @@ -318,7 +318,9 @@ handle_msg(#{null:=<<"hello">>,
OldSID
}
end,
{ok, PPID}=gen_server:call(tpic2_cmgr, {peer,PK, Reg}),
{_OkOrExists,PPID}=gen_server:call(tpic2_cmgr, {peer,PK, Reg}),
%logger:notice("RegRes ~p ~p: ~p",[PK, Reg, RegRes]),
%{ok, PPID}=RegRes,
lists:foreach(fun(Addr) ->
gen_server:call(PPID, {add, binary_to_list(Addr), Port})
end,
Expand Down
143 changes: 95 additions & 48 deletions apps/tpnode/src/discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,16 @@ handle_call({get_pid, Name}, _From, #{local_services:=Dict} = State) when is_bin
end,
{reply, Reply, State};

handle_call({lookup, Pred}, _From, State) when is_function(Pred) ->
{reply, query(Pred, State), State};

%% get list of ip and port for service with name Name (local and remote)
handle_call({lookup, Name}, _From, State) ->
{reply, query(Name, State), State};

handle_call({lookup, Name, Chain}, _From, State) ->
{reply, query(Name, Chain, State), State};

handle_call({lookup_raw, Name, Chain}, _From, State) ->
{reply, query_raw(Name, Chain, State), State};

handle_call({lookup_remote, Name}, _From, #{remote_services := RemoteDict} = State) ->
{reply, query_remote(Name, RemoteDict, blockchain:chain()), State};

Expand Down Expand Up @@ -331,7 +331,7 @@ get_local_addresses(State) ->


% --------------------------------------------------------
announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
prepare_announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
try
%% TranslatedAddress = add_hostname(translate_address(Address), Hostname),

Expand All @@ -350,16 +350,26 @@ announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
scopes => Scopes,
chain => blockchain:chain()
},
AnnounceBin = pack(Announce),
send_service_announce(AnnounceBin)
{ok, pack(Announce)}
catch
Err:Reason ->
?LOG_ERROR(
"Announce with name ~p and address ~p and scopes ~p hasn't made because ~p ~p",
[Name, TranslatedAddress, Scopes, Err, Reason]
)
),
error
end.

%announce_one_service(Name, TranslatedAddress, Ttl, Scopes) ->
% case prepare_announce_one_service(Name, TranslatedAddress, Ttl, Scopes) of
% {ok, AnnounceBin} ->
% send_service_announce(AnnounceBin),
% ok;
% error ->
% error
% end.


% ------------------------------------------------------------
-spec is_local_service(Announce :: #{ 'nodeid' := _, _ := _ }) -> boolean().

Expand Down Expand Up @@ -418,49 +428,60 @@ get_local_names(Names) ->
% --------------------------------------------------------

% make announce of our local services with tpic scope
make_announce(#{names:=Names} = _Dict, State) ->
?LOG_DEBUG("Announcing our local services"),
prepare_announce(#{names:=Names} = _Dict, State) ->
Ttl = max(get_config(intrachain_ttl, 300, State), 30),
Hostname = application:get_env(tpnode, hostname, unknown),
%% ValidUntil = os:system_time(seconds) + get_config(intrachain_ttl, 120, State),
Addresses = get_config(addresses, get_default_addresses(), State),
AllScopesCfg = get_config(scope, ?DEFAULT_SCOPE_CONFIG, State),
MacroDict = get_config(macro_dict, #{}, State),
LocalNames = get_local_names(Names),
Announcer = fun(Name, Counter) ->
Counter + lists:foldl(
% #{address => local4, port => 53221, proto => tpic}
fun(#{proto := Proto} = Address, AddrCounter) ->
Scopes = get_scopes(Proto, AllScopesCfg),
IsAdvertisable = in_scope(Proto, tpic, AllScopesCfg),
IsRightProto = is_right_proto(Name, Proto),
%% ?LOG_DEBUG("ann dbg ~p ~p ~p ~p", [Name, IsAdvertisable, IsRightProto, Address]),

if
IsRightProto == true andalso IsAdvertisable == true ->
try
TranslatedAddress =
add_hostname(substitute_macro(Address, MacroDict), Hostname),
announce_one_service(Name, TranslatedAddress, Ttl, Scopes),
AddrCounter + 1
catch
pass ->
?LOG_DEBUG("skip address (can't substitute macro?): ~p", [Address]),
AddrCounter
end;
Announcer = fun(Name, ListAcc) ->
lists:foldl(
% #{address => local4, port => 53221, proto => tpic}
fun(#{proto := Proto} = Address, Acc) ->
Scopes = get_scopes(Proto, AllScopesCfg),
IsAdvertisable = in_scope(Proto, tpic, AllScopesCfg),
IsRightProto = is_right_proto(Name, Proto),
if IsRightProto == true andalso IsAdvertisable == true ->
try
TranslatedAddress =
add_hostname(substitute_macro(Address, MacroDict), Hostname),
case prepare_announce_one_service(
Name,
TranslatedAddress,
Ttl,
Scopes) of
{ok, Bin} ->
[Bin|Acc];
error ->
Acc
end
catch
pass ->
?LOG_DEBUG("skip address (can't substitute macro?): ~p", [Address]),
Acc
end;
true ->
Acc
end;
(Address, Acc) ->
?LOG_DEBUG("skip announce for invalid address ~p ~p", [Name, Address]),
Acc
end,
ListAcc,
Addresses)
end,
lists:foldl(Announcer, [], LocalNames).

true ->
%% ?LOG_DEBUG("skip announce for address ~p ~p", [Name, Address]),
AddrCounter
end;
(Address, AddrCounter) ->
?LOG_DEBUG("skip announce for invalid address ~p ~p", [Name, Address]),
AddrCounter
end,
0,
Addresses)

make_announce(Dict, State) ->
?LOG_DEBUG("Announcing our local services"),
List2Announce = prepare_announce(Dict, State),
Announcer = fun(Bin, Cnt) ->
send_service_announce(Bin),
Cnt+1
end,
ServicesCount = lists:foldl(Announcer, 0, LocalNames),
ServicesCount = lists:foldl(Announcer, 0, List2Announce),
?LOG_DEBUG("Announced ~p of our services", [ServicesCount]),
ok.

Expand Down Expand Up @@ -756,11 +777,42 @@ query_remote(Name, _Dict, Chain) ->

% --------------------------------------------------------

query_raw(Name0, Chain0, State) ->
Name = convert_to_binary(Name0),
LC=blockchain:chain(),
#{local_services := #{names:=LocalDict}, remote_services := RemoteDict} = State,
{Chain,Local} = if is_integer(Chain0) andalso Chain0>0 andalso Chain0=/=LC ->
{Chain0,[]};
true ->
{LC,
prepare_announce(
#{names=>
maps:with([Name0],LocalDict)
}, State)
}
end,
RQR=fun(RName0, Dict) ->
Name1 = add_chain_to_name(RName0, Chain),
Nodes = maps:get(Name1, Dict, #{}),
Announces = maps:values(Nodes),
lists:map(
fun(#{bin:=B}) ->
%maps:merge(Address, maps:with([nodeid,node_name,pubkey],A))
B
end, Announces
)
end,

lists:merge(Local, RQR(Name,RemoteDict)).


query(Name0, Chain, State) ->
Name = convert_to_binary(Name0),
LocalChain = blockchain:chain(),
#{local_services := LocalDict, remote_services := RemoteDict} = State,
Local = case Chain of
0 ->
query_local(Name, LocalDict, State);
LocalChain ->
query_local(Name, LocalDict, State);
_ ->
Expand All @@ -774,11 +826,6 @@ query(Name0, Chain, State) ->

% --------------------------------------------------------

query(Pred, _State) when is_function(Pred) ->
?LOG_ERROR("Not inmplemented"),
not_implemented;

% find service by name
query(Name, State) ->
query(Name, blockchain:chain(), State).

Expand Down Expand Up @@ -870,7 +917,7 @@ process_announce(
try
Key = address2key(Address),
Name = add_chain_to_name(Name0, Chain),
Announce = add_valid_until(Announce0, MaxTtl),
Announce = add_valid_until(Announce0#{bin=>AnnounceBin}, MaxTtl),
Nodes = maps:get(Name, Dict, #{}),
PrevAnnounce = maps:get(Key, Nodes, #{created => 0, ttl=> 0, sent_xchain => 0}),
SentXchain = relay_announce(PrevAnnounce, Announce, AnnounceBin, XChainThrottle),
Expand Down
76 changes: 70 additions & 6 deletions apps/tpnode/src/tpnode_tpicapi.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ init(Req0, _) ->
throw:{return,RCode,RBody} ->
{ok, cowboy_req:reply(RCode, #{}, RBody, Req0), #{} }
end.


%% -- [ pick_block ] --

handle(<<"GET">>, [<<"pick_block">>,Hash], _Req) ->
handle(<<"GET">>, [<<"pick_block">>,Hash,<<"self">>], _Req);

Expand All @@ -56,13 +58,36 @@ handle(<<"GET">>, [<<"pick_block">>,THash,TRel], _Req) ->
{404, <<"not found">>}
end;

%% -- [ sync ] --

handle(<<"GET">>,[<<"sync">>,<<"request">>], _Req) ->
{200,
#{<<"content-type">> =><<"binary/msgpack">>},
#{<<"content-type">> =><<"application/msgpack">>},
msgpack:pack(gen_server:call(blockchain_reader,sync_req))
};

handle(<<"GET">>,[<<"txpool">>,TxID], _Req) ->
%% -- [ discovery ] --

handle(<<"GET">>,[<<"discovery">>,<<"tpicpeer">>,_BChain]=Path,Req) ->
case Req of
#{cert:=Cert} when is_binary(Cert) ->
handle(get, Path ,Req);
_ ->
{403, <<"unauth">>}
end;

handle(<<"GET">>,[<<"discovery">>,Service,BChain], Req) ->
handle(get,[<<"discovery">>,Service,BChain], Req);

handle(get,[<<"discovery">>,Service,BChain], _Req) ->
Chain=binary_to_integer(BChain),
List=gen_server:call(discovery,{lookup_raw,Service,Chain}),
Body=msgpack:pack(List),
{200, #{ <<"content-type">> => <<"application/msgpack">> }, Body};

%% -- [ txstorage ] --

handle(<<"GET">>,[<<"txstorage">>,TxID], _Req) ->
case tpnode_txstorage:get_txm(TxID) of
{ok, #{body:=Body,
origin:=Origin,
Expand All @@ -79,7 +104,7 @@ handle(<<"GET">>,[<<"txpool">>,TxID], _Req) ->
{404, <<"not found">>}
end;

handle(<<"PATCH">>,[<<"txpool">>,TxID], #{cert:=Cert}=_Req) when is_binary(Cert) ->
handle(<<"PATCH">>,[<<"txstorage">>,TxID], #{cert:=Cert}=_Req) when is_binary(Cert) ->
#{pubkey:=PubKey}=tpic2:extract_cert_info(public_key:pkix_decode_cert(Cert,otp)),
case tpnode_txstorage:get_txm(TxID) of
{ok, #{origin:=Origin, valid:=_Valid}} when Origin==PubKey ->
Expand All @@ -100,8 +125,7 @@ handle(<<"PATCH">>,[<<"txpool">>,TxID], #{cert:=Cert}=_Req) when is_binary(Cert)
{404, <<"not found">>}
end;


handle(<<"PUT">>,[<<"txpool">>, TxID], #{cert:=Cert,has_body:=true}=Req) when is_binary(Cert) ->
handle(<<"PUT">>,[<<"txstorage">>, TxID], #{cert:=Cert,has_body:=true}=Req) when is_binary(Cert) ->
{ok, Body, Req1} = cowboy_req:read_body(Req),
#{pubkey:=PubKey}=tpic2:extract_cert_info(public_key:pkix_decode_cert(Cert,otp)),
case gen_server:call(txstorage,
Expand All @@ -117,6 +141,46 @@ handle(<<"PUT">>,[<<"txpool">>, TxID], #{cert:=Cert,has_body:=true}=Req) when is
{{400, <<"body_mismatch">>},Req1}
end;

% -- [ tx ] --

handle(<<"PUT">>, [<<"tx">>,<<"multi">>], #{has_body := true} = Req) ->
{ok, Body, Req1} = cowboy_req:read_body(Req),
{ case maps:get(<<"content-type">>,Req1,undefined) of
<<"application/msgpack">> ->
{ok,Lst} = msgpack:unpack(Body),
if(is_list(Lst)) -> ok;
true -> throw({return,400,<<"nolist">>})
end,
Res=lists:map(
fun(Bin) when is_binary(Bin) ->
case txpool:new_tx(Bin) of
{ok, TxID} ->
TxID;
{error, Err} ->
[<<"error">>,
iolist_to_binary(io_lib:format("~p", [Err]))
]
end
end, Lst),
{200, #{<<"content-type">> =><<"application/msgpack">>}, msgpack:pack(Res) };
_ ->
{400, <<"unexpected content-type">>}
end, Req};

%
handle(<<"PUT">>, [<<"tx">>], #{has_body := true} = Req) ->
{ok, Body, Req1} = cowboy_req:read_body(Req),
{ case txpool:new_tx(Body) of
{ok, TxID} ->
{200,TxID};
{error, Err} ->
{500,
iolist_to_binary(io_lib:format("~p", [Err]))
}
end, Req1};

% -- [ status ] --

handle(<<"GET">>,[<<"status">>], #{cert:=Cert}) when is_binary(Cert) ->
#{pubkey:=PubKey}=tpic2:extract_cert_info(public_key:pkix_decode_cert(Cert,otp)),
{200,#{
Expand Down
Loading

0 comments on commit 555216f

Please sign in to comment.