@@ -52,7 +52,8 @@ groups() ->
52
52
{mock , [], [
53
53
insufficient_credit ,
54
54
incoming_heartbeat ,
55
- multi_transfer_without_delivery_id
55
+ multi_transfer_without_delivery_id ,
56
+ set_sender_capabilities
56
57
]}
57
58
].
58
59
@@ -68,7 +69,7 @@ shared() ->
68
69
subscribe_with_auto_flow_unsettled ,
69
70
outgoing_heartbeat ,
70
71
roundtrip_large_messages ,
71
- transfer_id_vs_delivery_id
72
+ transfer_id_vs_delivery_id
72
73
].
73
74
74
75
% % -------------------------------------------------------------------
@@ -687,6 +688,7 @@ subscribe_with_auto_flow_unsettled(Config) ->
687
688
ok = amqp10_client :end_session (Session ),
688
689
ok = amqp10_client :close_connection (Connection ).
689
690
691
+
690
692
insufficient_credit (Config ) ->
691
693
Hostname = ? config (mock_host , Config ),
692
694
Port = ? config (mock_port , Config ),
@@ -790,6 +792,76 @@ multi_transfer_without_delivery_id(Config) ->
790
792
ok = amqp10_client :close_connection (Connection ),
791
793
ok .
792
794
795
+ set_sender_capabilities (Config ) ->
796
+ Hostname = ? config (mock_host , Config ),
797
+ Port = ? config (mock_port , Config ),
798
+
799
+ OpenStep = fun ({0 = Ch , # 'v1_0.open' {}, _Pay }) ->
800
+ {Ch , [# 'v1_0.open' {container_id = {utf8 , <<" mock" >>}}]}
801
+ end ,
802
+ BeginStep = fun ({1 = Ch , # 'v1_0.begin' {}, _Pay }) ->
803
+ {Ch , [# 'v1_0.begin' {remote_channel = {ushort , 1 },
804
+ next_outgoing_id = {uint , 1 },
805
+ incoming_window = {uint , 1000 },
806
+ outgoing_window = {uint , 1000 }}
807
+ ]}
808
+ end ,
809
+ AttachStep = fun ({1 = Ch , # 'v1_0.attach' {role = true ,
810
+ name = Name ,
811
+ source = # 'v1_0.source' {
812
+ capabilities = {utf8 , <<" capability-1" >>}}}, <<>>}) ->
813
+ {Ch , [# 'v1_0.attach' {name = Name ,
814
+ handle = {uint , 99 },
815
+ initial_delivery_count = {uint , 1 },
816
+ role = false }
817
+ ]}
818
+ end ,
819
+
820
+ LinkCreditStep = fun ({1 = Ch , # 'v1_0.flow' {}, <<>>}) ->
821
+ {Ch , {multi , [[# 'v1_0.transfer' {handle = {uint , 99 },
822
+ delivery_id = {uint , 12 },
823
+ more = true },
824
+ # 'v1_0.data' {content = <<" hello " >>}],
825
+ [# 'v1_0.transfer' {handle = {uint , 99 },
826
+ % delivery_id can be omitted
827
+ % for continuation frames
828
+ delivery_id = undefined ,
829
+ settled = undefined ,
830
+ more = false },
831
+ # 'v1_0.data' {content = <<" world" >>}]
832
+ ]}}
833
+ end ,
834
+ Steps = [fun mock_server :recv_amqp_header_step /1 ,
835
+ fun mock_server :send_amqp_header_step /1 ,
836
+ mock_server :amqp_step (OpenStep ),
837
+ mock_server :amqp_step (BeginStep ),
838
+ mock_server :amqp_step (AttachStep ),
839
+ mock_server :amqp_step (LinkCreditStep )
840
+ ],
841
+
842
+ ok = mock_server :set_steps (? config (mock_server , Config ), Steps ),
843
+
844
+ Cfg = #{address => Hostname , port => Port , sasl => none , notify => self ()},
845
+ {ok , Connection } = amqp10_client :open_connection (Cfg ),
846
+ {ok , Session } = amqp10_client :begin_session_sync (Connection ),
847
+ {ok , Receiver } = amqp10_client :attach_receiver_link (Session , <<" mock1-received" >>,
848
+ <<" test" >>,
849
+ setlled , none ,
850
+ #{}, % filter
851
+ #{}, % prorperties
852
+ <<" capability-1" >>),
853
+ amqp10_client :flow_link_credit (Receiver , 100 , 50 ),
854
+ receive
855
+ {amqp10_msg , Receiver , _InMsg } ->
856
+ ok
857
+ after 2000 ->
858
+ exit (delivery_timeout )
859
+ end ,
860
+
861
+ ok = amqp10_client :end_session (Session ),
862
+ ok = amqp10_client :close_connection (Connection ),
863
+ ok .
864
+
793
865
outgoing_heartbeat (Config ) ->
794
866
Hostname = ? config (rmq_hostname , Config ),
795
867
Port = rabbit_ct_broker_helpers :get_node_config (Config , 0 , tcp_port_amqp ),
0 commit comments