Skip to content

Commit

Permalink
Merge pull request #458 from hairyhum/kpro-connection-timeout
Browse files Browse the repository at this point in the history
Propagate connect_timeout argument to kpro API functions
  • Loading branch information
zmstone authored Feb 5, 2022
2 parents 6410360 + 3e4f590 commit a9c193c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
* 3.16.2
* Propagate `connect_timeout` config to `kpro` API functions as `timeout` arg
affected APIs: connect_group_coordinator, create_topics, delete_topics,
resolve_offset, fetch, fold, fetch_committed_offsets
* 3.16.1
* Fix `brod` script in `brod-cli` in release.
* Support `rebalance_timeout` consumer group option
Expand Down
6 changes: 4 additions & 2 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,8 @@ fetch(Hosts, Topic, Partition, Offset,
-spec connect_leader([endpoint()], topic(), partition(),
conn_config()) -> {ok, pid()}.
connect_leader(Hosts, Topic, Partition, ConnConfig) ->
kpro:connect_partition_leader(Hosts, ConnConfig, Topic, Partition).
KproOptions = brod_utils:kpro_connection_options(ConnConfig),
kpro:connect_partition_leader(Hosts, ConnConfig, Topic, Partition, KproOptions).

%% @doc List ALL consumer groups in the given kafka cluster.
%% NOTE: Exception if failed to connect any of the coordinator brokers.
Expand Down Expand Up @@ -975,7 +976,8 @@ describe_groups(CoordinatorEndpoint, ConnCfg, IDs) ->
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) ->
{ok, pid()} | {error, any()}.
connect_group_coordinator(BootstrapEndpoints, ConnCfg, GroupId) ->
Args = #{type => group, id => GroupId},
KproOptions = brod_utils:kpro_connection_options(ConnCfg),
Args = maps:merge(KproOptions, #{type => group, id => GroupId}),
kpro:connect_coordinator(BootstrapEndpoints, ConnCfg, Args).

%% @doc Fetch committed offsets for ALL topics in the given consumer group.
Expand Down
36 changes: 25 additions & 11 deletions src/brod_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
, resolve_offset/4
, resolve_offset/5
, resolve_offset/6
, kpro_connection_options/1
]).

-include("brod_int.hrl").
Expand Down Expand Up @@ -93,7 +94,8 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) ->
validate_only => boolean()}, conn_config()) ->
{ok, kpro:struct()} | {error, any()} | ok.
create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) ->
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg)),
KproOpts = kpro_connection_options(ConnCfg),
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
fun(Pid) ->
Request = brod_kafka_request:create_topics(
Pid, TopicConfigs, RequestConfigs),
Expand All @@ -110,7 +112,8 @@ delete_topics(Hosts, Topics, Timeout) ->
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) ->
{ok, kpro:struct()} | {error, any()}.
delete_topics(Hosts, Topics, Timeout, ConnCfg) ->
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg)),
KproOpts = kpro_connection_options(ConnCfg),
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
fun(Pid) ->
Request = brod_kafka_request:delete_topics(
Pid, Topics, Timeout),
Expand Down Expand Up @@ -148,20 +151,18 @@ get_metadata(Hosts, Topics, ConnCfg) ->
offset_time(), conn_config()) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg) ->
Timeout =
proplists:get_value(connect_timeout, ConnCfg, ?BROD_DEFAULT_TIMEOUT),
Opts = #{timeout => Timeout},
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts).
KproOpts = kpro_connection_options(ConnCfg),
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, KproOpts).

%% @doc Resolve timestamp to real offset.
-spec resolve_offset([endpoint()], topic(), partition(),
offset_time(), conn_config(),
#{timeout => kpro:int32()}) ->
{ok, offset()} | {error, any()}.
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts) ->
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, KproOpts) ->
with_conn(
kpro:connect_partition_leader(Hosts, nolink(ConnCfg),
Topic, Partition, Opts),
Topic, Partition, KproOpts),
fun(Pid) -> resolve_offset(Pid, Topic, Partition, Time) end).

%% @doc Resolve timestamp or semantic offset to real offset.
Expand Down Expand Up @@ -284,8 +285,9 @@ flatten_batches(BeginOffset, Header, Batches0) ->
fetch(Hosts, Topic, Partition, Offset, Opts) when is_list(Hosts) ->
fetch({Hosts, []}, Topic, Partition, Offset, Opts);
fetch({Hosts, ConnCfg}, Topic, Partition, Offset, Opts) ->
KproOpts = kpro_connection_options(ConnCfg),
with_conn(
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition),
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition, KproOpts),
fun(Conn) -> fetch(Conn, Topic, Partition, Offset, Opts) end);
fetch(Client, Topic, Partition, Offset, Opts) when is_atom(Client) ->
case brod_client:get_leader_connection(Client, Topic, Partition) of
Expand All @@ -306,8 +308,9 @@ fold(Hosts, Topic, Partition, Offset, Opts,
Acc, Fun, Limits) when is_list(Hosts) ->
fold({Hosts, []}, Topic, Partition, Offset, Opts, Acc, Fun, Limits);
fold({Hosts, ConnCfg}, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
KproOpts = kpro_connection_options(ConnCfg),
case with_conn(
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition),
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition, KproOpts),
fun(Conn) -> fold(Conn, Topic, Partition, Offset, Opts,
Acc, Fun, Limits) end) of
{error, Reason} ->
Expand Down Expand Up @@ -376,7 +379,8 @@ init_sasl_opt(Config) ->
group_id(), [topic()]) ->
{ok, [kpro:struct()]} | {error, any()}.
fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId, Topics) ->
Args = #{type => group, id => GroupId},
KproOpts = kpro_connection_options(ConnCfg),
Args = maps:merge(KproOpts, #{type => group, id => GroupId}),
with_conn(
kpro:connect_coordinator(BootstrapEndpoints, nolink(ConnCfg), Args),
fun(Pid) -> do_fetch_committed_offsets(Pid, GroupId, Topics) end).
Expand Down Expand Up @@ -595,6 +599,16 @@ get_stable_offset(Header) ->
%% handle the case when high_watermark < last_stable_offset
min(StableOffset, HighWmOffset).

%% @doc get kpro connection options from brod connection config
kpro_connection_options(ConnCfg) ->
Timeout = case ConnCfg of
List when is_list(List) ->
proplists:get_value(connect_timeout, List, ?BROD_DEFAULT_TIMEOUT);
Map when is_map(Map) ->
maps:get(connect_timeout, Map, ?BROD_DEFAULT_TIMEOUT)
end,
#{timeout => Timeout}.

%%%_* Internal functions =======================================================

do_fold(Spawn, {Pid, Mref}, Offset, Acc, Fun, End, Count) ->
Expand Down

0 comments on commit a9c193c

Please sign in to comment.