From 619c6f89fb2deda0c6109a156cdd6598b83797dc Mon Sep 17 00:00:00 2001 From: Vasu Dasari Date: Fri, 27 Oct 2017 14:49:45 -0400 Subject: [PATCH 1/5] Issue 1: Add ability to specify protocol buffer messages as records Module which deals with records needs to be given as input to unary RPC call. This can be presented as a tuple to the Options list that is passed to grpc_client:unary(). {msgs_as_records,ModuleName}. And user is expeced to generate records based header file using gpb. This can be out of the scope of grpc_client API. For example for our route_guide example: Point = #'Point'{ latitude = 409146138, longitude = -746188906 }, Return = route_guide_client:'GetFeature'(Connection, Point, [{msgs_as_records,route_guide_pb}]). API Return will also include a record: {ok,#{grpc_status => 0, headers => #{<<":status">> => <<"200">>}, http_status => 200, result => {'Feature',<<"Berkshire Valley Management Area Trail, Jefferson, NJ, USA">>, {'Point',409146138,-746188906}}, status_message => <<>>, trailers => #{<<"grpc-status">> => <<"0">>}}} --- src/grpc_client_stream.erl | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/grpc_client_stream.erl b/src/grpc_client_stream.erl index f466301..6b0d720 100644 --- a/src/grpc_client_stream.erl +++ b/src/grpc_client_stream.erl @@ -226,6 +226,7 @@ new_stream(Connection, Service, Rpc, Encoder, Options) -> Compression = proplists:get_value(compression, Options, none), Metadata = proplists:get_value(metadata, Options, #{}), TransportOptions = proplists:get_value(http2_options, Options, []), + RecordsEncoder = proplists:get_value(msgs_as_records, Options, []), {ok, StreamId} = grpc_client_connection:new_stream(Connection, TransportOptions), Package = Encoder:get_package_name(), RpcDef = Encoder:find_rpc_def(Service, Rpc), @@ -240,6 +241,7 @@ new_stream(Connection, Service, Rpc, Encoder, Options) -> response_pending => false, state => idle, encoder => Encoder, + records_encoder => RecordsEncoder, connection => Connection, headers_sent => false, metadata => Metadata, @@ -325,17 +327,24 @@ info_response(Response, #{queue := Queue} = Stream) -> %% TODO: fix the error handling, currently it is very hard to understand the %% error that results from a bad message (Map). encode(#{encoder := Encoder, - input := MsgType, - compression := CompressionMethod}, Map) -> - %% RequestData = Encoder:encode_msg(Map, MsgType), - try Encoder:encode_msg(Map, MsgType) of - RequestData -> + records_encoder := RecordsEncoder, + input := MsgType, + compression := CompressionMethod}, Msg) -> + try + begin + RequestData = case is_map(Msg) of + true -> + Encoder:encode_msg(Msg, MsgType); + false when is_tuple(Msg) -> + RecordsEncoder:encode_msg(Msg) + end, maybe_compress(RequestData, CompressionMethod) + end catch error:function_clause -> - throw({error, {failed_to_encode, MsgType, Map}}); + throw({error, {failed_to_encode, MsgType, Msg}}); Error:Reason -> - throw({error, {Error, Reason}}) + throw({error, {Error, Reason}}) end. maybe_compress(Encoded, none) -> @@ -351,12 +360,18 @@ maybe_compress(_Encoded, Other) -> decode(Encoded, Binary, #{response_encoding := Method, encoder := Encoder, + records_encoder := RecordsEncoder, output := MsgType}) -> - Message = case Encoded of + Message = case Encoded of 1 -> decompress(Binary, Method); 0 -> Binary end, - Encoder:decode_msg(Message, MsgType). + case RecordsEncoder == [] of + true -> + Encoder:decode_msg(Message, MsgType); + _ -> + RecordsEncoder:decode_msg(Message, MsgType) + end. decompress(Compressed, <<"gzip">>) -> zlib:gunzip(Compressed); From 61ad2eaf257a81c5d567af354ebbf91e46f4de7c Mon Sep 17 00:00:00 2001 From: Vasu Dasari Date: Sun, 26 Nov 2017 09:24:47 -0500 Subject: [PATCH 2/5] Issue 3: Add support to deliver messages asynchronously Details of this request in "Issue 3". With this code, user can create a streaming RPC connection by specifying how messages have to be deliverd. {ok, Stream} = grpc_client:new_stream(Connection, 'RouteGuide', 'RouteChat', route_guide, [{async_notification, self()}]). And when a notification arrives on the stream, a message of the following form {notification,Response} will be delivered to the calling process. --- src/grpc_client.erl | 6 ++---- src/grpc_client_stream.erl | 5 +++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/grpc_client.erl b/src/grpc_client.erl index 9a79ff5..f991a5f 100644 --- a/src/grpc_client.erl +++ b/src/grpc_client.erl @@ -180,16 +180,14 @@ new_stream(Connection, Service, Rpc, DecoderModule, Options) -> -spec send(Stream::client_stream(), Msg::map()) -> ok. %% @doc Send a message from the client to the server. -send(Stream, Msg) when is_pid(Stream), - is_map(Msg) -> +send(Stream, Msg) when is_pid(Stream) -> grpc_client_stream:send(Stream, Msg). -spec send_last(Stream::client_stream(), Msg::map()) -> ok. %% @doc Send a message to server and mark it as the last message %% on the stream. For simple RPC and client-streaming RPCs that %% will trigger the response from the server. -send_last(Stream, Msg) when is_pid(Stream), - is_map(Msg) -> +send_last(Stream, Msg) when is_pid(Stream) -> grpc_client_stream:send_last(Stream, Msg). -spec rcv(Stream::client_stream()) -> rcv_response(). diff --git a/src/grpc_client_stream.erl b/src/grpc_client_stream.erl index 6b0d720..fb3bc68 100644 --- a/src/grpc_client_stream.erl +++ b/src/grpc_client_stream.erl @@ -227,6 +227,7 @@ new_stream(Connection, Service, Rpc, Encoder, Options) -> Metadata = proplists:get_value(metadata, Options, #{}), TransportOptions = proplists:get_value(http2_options, Options, []), RecordsEncoder = proplists:get_value(msgs_as_records, Options, []), + ClientPid = proplists:get_value(async_notification, Options), {ok, StreamId} = grpc_client_connection:new_stream(Connection, TransportOptions), Package = Encoder:get_package_name(), RpcDef = Encoder:find_rpc_def(Service, Rpc), @@ -239,6 +240,7 @@ new_stream(Connection, Service, Rpc, Encoder, Options) -> rpc => Rpc, queue => queue:new(), response_pending => false, + async_notification => ClientPid, state => idle, encoder => Encoder, records_encoder => RecordsEncoder, @@ -316,6 +318,9 @@ add_metadata(Headers, Metadata) -> lists:keystore(K, 1, Acc, {K,V}) end, Headers, maps:to_list(Metadata)). +info_response(Response, #{async_notification := Client} = Stream) when is_pid(Client) -> + Client ! {notification,Response}, + {noreply, Stream}; info_response(Response, #{response_pending := true, client := Client} = Stream) -> gen_server:reply(Client, Response), From 49c653ebefa9b9cd246d575ee758c3fec7574f11 Mon Sep 17 00:00:00 2001 From: Vasu Dasari Date: Sat, 2 Dec 2017 20:58:27 -0500 Subject: [PATCH 3/5] Add application dependencies to build OTP releases --- src/grpc_client.app.src | 1 + 1 file changed, 1 insertion(+) diff --git a/src/grpc_client.app.src b/src/grpc_client.app.src index 5ea4b44..6ec2512 100644 --- a/src/grpc_client.app.src +++ b/src/grpc_client.app.src @@ -2,6 +2,7 @@ [{description,"gRPC client in Erlang"}, {vsn,"0.1.0"}, {modules,[]}, + {applications, [grpc_lib,http2_client]}, {registered, []}, {env, []}, {applications,[]}]}. From f7b89e03b2c02f3069241bf5c77f09ec7148eaf3 Mon Sep 17 00:00:00 2001 From: Vasu Dasari Date: Wed, 13 Jun 2018 09:56:12 -0400 Subject: [PATCH 4/5] Update function specs Specs to reflect message to be of type tuple (records) or map --- src/grpc_client.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/grpc_client.erl b/src/grpc_client.erl index f991a5f..917a02c 100644 --- a/src/grpc_client.erl +++ b/src/grpc_client.erl @@ -71,10 +71,12 @@ -type metadata() :: #{metadata_key() => metadata_value()}. -type compression_method() :: none | gzip. +-type msg_type() :: map() | tuple(). -type stream_option() :: {metadata, metadata()} | {compression, compression_method()} | - {http2_options, [term()]}. + {http2_options, [term()]} | + {msgs_as_records,module()}. -type client_stream() :: pid(). @@ -178,12 +180,12 @@ new_stream(Connection, Service, Rpc, DecoderModule) -> new_stream(Connection, Service, Rpc, DecoderModule, Options) -> grpc_client_stream:new(Connection, Service, Rpc, DecoderModule, Options). --spec send(Stream::client_stream(), Msg::map()) -> ok. +-spec send(Stream::client_stream(), Msg::msg_type()) -> ok. %% @doc Send a message from the client to the server. send(Stream, Msg) when is_pid(Stream) -> grpc_client_stream:send(Stream, Msg). --spec send_last(Stream::client_stream(), Msg::map()) -> ok. +-spec send_last(Stream::client_stream(), Msg::msg_type()) -> ok. %% @doc Send a message to server and mark it as the last message %% on the stream. For simple RPC and client-streaming RPCs that %% will trigger the response from the server. @@ -238,7 +240,7 @@ stop_connection(Connection) -> grpc_client_connection:stop(Connection). -spec unary(Connection::connection(), - Message::map(), Service::atom(), Rpc::atom(), + Message::msg_type(), Service::atom(), Rpc::atom(), Decoder::module(), Options::[stream_option() | {timeout, timeout()}]) -> unary_response(map()). From 930384100ddff5fb05ffb058f11b33df878c304f Mon Sep 17 00:00:00 2001 From: Vasu Dasari Date: Wed, 13 Jun 2018 09:57:28 -0400 Subject: [PATCH 5/5] Update grpc notification As per code review comment, update notification sent to registered clients as {grpc_notification,Response} --- src/grpc_client_stream.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/grpc_client_stream.erl b/src/grpc_client_stream.erl index fb3bc68..b763056 100644 --- a/src/grpc_client_stream.erl +++ b/src/grpc_client_stream.erl @@ -319,7 +319,7 @@ add_metadata(Headers, Metadata) -> end, Headers, maps:to_list(Metadata)). info_response(Response, #{async_notification := Client} = Stream) when is_pid(Client) -> - Client ! {notification,Response}, + Client ! {grpc_notification,Response}, {noreply, Stream}; info_response(Response, #{response_pending := true, client := Client} = Stream) ->