An implementation of Spring’s Cloud Stream Binder for integrating with Solace PubSub+ message brokers. The Spring Cloud Stream Binder project provides a higher-level abstraction towards messaging that standardizes the development of distributed message-based systems.
❗
|
|
The Solace implementation of the Spring Cloud Stream Binder maps the following concepts from Spring to Solace:
-
Destinations to topics/subscriptions
-
Producer bindings always sends messages to topics
-
-
Consumer groups to durable queues
-
A consumer group’s queue is subscribed to its destination subscription (default)
-
Consumer bindings always receives messages from queues
-
-
Anonymous consumer groups to temporary queues (When no group is specified; used for SCS Publish-Subscribe Model)
In Solace, the above setup is called topic-to-queue mapping. So a typical message flow would then appear as follows:
-
Producer bindings publish messages to their destination topics
-
Each consumer groups' queue receives the messages published to their destination topic
-
The PubSub+ broker distributes messages in a round-robin fashion to each consumer binding for a particular consumer group
ℹ️Round-robin distribution only occurs if the consumer group’s queue is configured for non-exclusive access. If the queue has exclusive access, then only one consumer will receive messages.
❗
|
Since consumer bindings always consumes from queues it is required that Assured Delivery is enabled on the Solace PubSub+ Message VPN being used (Assured Delivery is automatically enabled if using Solace Cloud). Additionally, the client username’s client profile must be allowed to send and receive guaranteed messages. |
For the sake of brevity, it will be assumed that you have a basic understanding of the Spring Cloud Stream project. If not, then please refer to Spring’s documentation. This document will solely focus on discussing components unique to Solace.
This project extends the Spring Cloud Stream Binder project. If you are new to Spring Cloud Stream, check out their documentation.
The following is a brief excerpt from that document:
'] __ Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
== Using it in your Application
=== Updating your build
The releases from this project are hosted in Maven Central.
The easiest way to get started is to include the
spring-cloud-stream-binder-solace
in your application.Here is how to include the spring cloud stream starter in your project using Gradle and Maven.
==== Using it with Gradle
// Solace Spring Cloud Stream Binder compile("ch.sbb:spring-cloud-stream-binder-solace:4.2.1")==== Using it with Maven
<!-- Solace Spring Cloud Stream Binder --> <dependency> <groupId>ch.sbb</groupId> <artifactId>spring-cloud-stream-binder-solace</artifactId> <version>4.2.1</version> </dependency>=== Creating a Simple Solace Binding
Starting in Spring Cloud Stream version 3 the recommended way to define binding and binding names is to use the Functional approach, which uses Spring Cloud Functions. You can learn more in the Spring Cloud Function support and Functional Binding Names sections of the reference guide.
Given this example app:
@SpringBootApplication public class SampleAppApplication { public static void main(String[] args) { SpringApplication.run(SampleAppApplication.class, args); } @Bean public Function<String, String> uppercase() { return value -> value.toUpperCase(); } }An applicable Solace configuration file may look like:
spring: cloud: function: definition: uppercase stream: bindings: uppercase-in-0: destination: queuename group: myconsumergroup binder: solace-broker uppercase-out-0: destination: uppercase/topic binder: solace-broker binders: solace-broker: type: solace environment: solace: # (1) java: host: tcp://localhost:55555 msgVpn: default clientUsername: default clientPassword: default connectRetries: -1 reconnectRetries: -1 # apiProperties: # ssl_trust_store: <path_to_trust_store> # ssl_trust_store_password: <trust_store_password> # ssl_validate_certificate: true
The latter half of this configuration where the Solace session is configured actually originates from the JCSMP Spring Boot Auto-Configuration project. See [Solace Session Properties] for more info.
For more samples see Solace Spring Cloud Samples repository.
For step-by-step instructions refer Solace Spring Cloud Stream tutorial and check out the blogs.
== Configuration Options
=== Solace Binder Configuration Options
Configuration of the Solace Spring Cloud Stream Binder is done through Spring Boot’s externalized configuration. This is where users can control the binder’s configuration options as well as the Solace Java API properties.
For general binder configuration options and properties, refer to the Spring Cloud Stream Reference Documentation.
==== Solace Session Properties
The binder’s Solace session is configurable using properties prefixed by
solace.java
orspring.cloud.stream.binders.<binder-name>.environment.solace.java
.
❗This binder leverages the JCSMP Spring Boot Auto-Configuration project to configure its session. See the JCSMP Spring Boot Auto-Configuration documentation for more info on how to configure these properties. See [Creating a Simple Solace Binding] for a simple example of how to configure a session for this binder.
💡Additional session properties not available under the usual
solace.java
prefix can be set usingsolace.java.apiProperties.<property>
, where<property>
is the name of a JCSMPProperties constant (e.g.ssl_trust_store
).See JCSMP Spring Boot Auto-Configuration documentation for more info about
solace.java.apiProperties
.==== Solace Consumer Properties
The following properties are available for Solace consumers only and must be prefixed with
spring.cloud.stream.solace.bindings.<bindingName>.consumer.
wherebindingName
looks something likefunctionName-in-0
as defined in Functional Binding Names.See SolaceCommonProperties and SolaceConsumerProperties for the most updated list.
- provisionDurableQueue
Whether to provision durable queues for non-anonymous consumer groups. This should only be set to
false
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: [Generated Queue Name Syntax]- addDestinationAsSubscriptionToQueue
Whether to add the Destination as a subscription to queue during provisioning.
Default:
true
- selector
If specified, enables client applications to choose which messages they are interested in receiving, as determined by the messages’ header field and property values.
A selector has a conditional expression syntax that is a subset of SQL92 Selector can be used with Queue or a Topic Endpoint Subscription
Default:
null
See: https://docs.solace.com/API/Solace-JMS-API/Selectors.htm- queueNameExpression
A SpEL expression for creating the consumer group’s queue name.
Default:
"'scst/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: [Generated Queue Name Syntax]
⚠️ Modifying this can cause naming conflicts between the queue names of consumer groups.
⚠️ While the default SpEL expression will consistently return a value adhering to [Generated Queue Name Syntax], directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - queueAccessType
Access type for the consumer group queue.
Default:
0
(ACCESSTYPE_NONEXCLUSIVE)
See: TheACCESSTYPE_
prefixed constants for other possible values- queuePermission
Permissions for the consumer group queue.
Default:
2
(PERMISSION_CONSUME)
See: ThePERMISSION_
prefixed constants for other possible values- queueDiscardBehaviour
If specified, whether to notify sender if a message fails to be enqueued to the consumer group queue.
Default:
null
- queueMaxMsgRedelivery
Sets the maximum message redelivery count on consumer group queue. (Zero means retry forever).
Default:
null
- queueMaxMsgSize
Maximum message size for the consumer group queue.
Default:
null
- queueQuota
Message spool quota for the consumer group queue.
Default:
null
- queueRespectsMsgTtl
Whether the consumer group queue respects Message TTL.
Default:
null
- queueAdditionalSubscriptions
An array of additional topic subscriptions to be applied on the consumer group queue.
These subscriptions may also contain wildcards.Default:
String[0]
See: Overview for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups.- autoBindErrorQueue
Whether to automatically create a durable error queue to which messages will be republished when message processing failures are encountered. Only applies once all internal retries have been exhausted.
Default:
false
💡Your ACL Profile must allow for publishing to this queue if you decide to use autoBindErrorQueue
.- provisionErrorQueue
Whether to provision durable queues for error queues when
autoBindErrorQueue
istrue
. This should only be set tofalse
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: [Generated Error Queue Name Syntax]- errorQueueNameExpression
A SpEL expression for creating the error queue’s name.
Default:
"'scst/error/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: [Generated Error Queue Name Syntax]
⚠️ Modifying this can cause naming conflicts between the error queue names.
⚠️ While the default SpEL expression will consistently return a value adhering to [Generated Queue Name Syntax], directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - errorQueueMaxDeliveryAttempts
Maximum number of attempts to send a failed message to the error queue. When all delivery attempts have been exhausted, the failed message will be requeued.
Default:
3
- errorQueueAccessType
Access type for the error queue.
Default:
0
(ACCESSTYPE_NONEXCLUSIVE)
See: TheACCESSTYPE_
prefixed constants for other possible values- errorQueuePermission
Permissions for the error queue.
Default:
2
(PERMISSION_CONSUME)
See: ThePERMISSION_
prefixed constants for other possible values- errorQueueDiscardBehaviour
If specified, whether to notify sender if a message fails to be enqueued to the error queue.
Default:
null
- errorQueueMaxMsgRedelivery
Sets the maximum message redelivery count on the error queue. (Zero means retry forever).
Default:
null
- errorQueueMaxMsgSize
Maximum message size for the error queue.
Default:
null
- errorQueueQuota
Message spool quota for the error queue.
Default:
null
- errorQueueRespectsMsgTtl
Whether the error queue respects Message TTL.
Default:
null
- errorMsgDmqEligible
The eligibility for republished messages to be moved to a Dead Message Queue.
Default:
null
- errorMsgTtl
The number of milliseconds before republished messages are discarded or moved to a Dead Message Queue.
Default:
null
- headerExclusions
The list of headers to exclude when converting consumed Solace message to Spring message.
Default: Empty
List<String>
- qualityOfService
The QoS (Quality of Service) to consume Messages. Possible Values:
AT_MOST_ONCE
QoS=0
Using topics: Messages may be lost or discarded.
This mode improves performance and reduces latency
When using
AT_MOST_ONCE
make sure the publisher uses deliveryMode=DIRECT
to avoid having the messages persisted on publish.
AT_LEAST_ONCE
QoS=1
Using a persistent queue: It is guaranteed that the message arrives at least once.
This mode persists messages on storage and therefore is slower.
Default:
AT_LEAST_ONCE
==== Solace Producer Properties
The following properties are available for Solace producers only and must be prefixed with
spring.cloud.stream.solace.bindings.<bindingName>.producer.
wherebindingName
looks something likefunctionName-out-0
as defined in Functional Binding Names.See SolaceCommonProperties and SolaceProducerProperties for the most updated list.
- destinationType
Specifies whether the configured
destination
is atopic
or aqueue
.When set to
topic
, thedestination
name is a topic subscription added on a queue.When set to
queue
, the producer binds to a queue matching thedestination
name. The queue can be auto-provisioned withprovisionDurableQueue=true
however, all naming prefix and queue name generation options do not apply. A queue will be provisioned using thedestination
name explicitly.Default:
topic
- headerExclusions
The list of headers to exclude from the published message. Excluding Solace message headers is not supported.
Default: Empty
List<String>
- nonserializableHeaderConvertToString
When set to
true
, irreversibly convert non-serializable headers to strings. An exception is thrown otherwise.Default:
false
❗Non-serializable headers should have a meaningful toString()
implementation. Otherwise enabling this feature may result in potential data loss.- provisionDurableQueue
Whether to provision durable queues for non-anonymous consumer groups or queue destinations. This should only be set to
false
if you have externally pre-provisioned the required queue on the message broker.Default:
true
See: [Generated Queue Name Syntax]- addDestinationAsSubscriptionToQueue
Whether to add the Destination as a subscription to queue during provisioning.
Default:
true
ℹ️Does not apply when destinationType=queue
.- queueNameExpression
A SpEL expression for creating the consumer group’s queue name.
Default:
"'scst/' + (isAnonymous ? 'an/' : 'wk/') + (group?.trim() + '/') + 'plain/' + destination.trim().replaceAll('[*>]', '_')"
See: [Generated Queue Name Syntax]
⚠️ Modifying this can cause naming conflicts between the queue names of consumer groups.
⚠️ While the default SpEL expression will consistently return a value adhering to [Generated Queue Name Syntax], directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - queueNameExpressionsForRequiredGroups
A mapping of required consumer groups to queue name SpEL expressions.
By default, queueNameExpression will be used to generate a required group’s queue name if it isn’t specified within this configuration option.
Default:
Empty Map<String, String>
See: [Generated Queue Name Syntax]
⚠️ Modifying this can cause naming conflicts between the queue names of consumer groups.
⚠️ While the default SpEL expression will consistently return a value adhering to [Generated Queue Name Syntax], directly using the SpEL expression string is not supported. The default value for this config option is subject to change without notice. - queueAccessType
Access type for binder provisioned queues.
Default:
0
(ACCESSTYPE_NONEXCLUSIVE)
See: TheACCESSTYPE_
prefixed constants for other possible values- queuePermission
Permissions for binder provisioned queues.
Default:
2
(PERMISSION_CONSUME)
See: ThePERMISSION_
prefixed constants for other possible values- queueDiscardBehaviour
Queue discard behaviour for binder provisioned queues. Whether to notify sender if a message fails to be enqueued to the endpoint. A null value means use the appliance default.
Default:
null
- queueMaxMsgRedelivery
Sets the maximum message redelivery count for binder provisioned queues. (Zero means retry forever).
Default:
null
- queueMaxMsgSize
Maximum message size for binder provisioned queues.
Default:
null
- queueQuota
Message spool quota for binder provisioned queues.
Default:
null
- queueRespectsMsgTtl
Whether the binder provisioned queues respect Message TTL.
Default:
null
- queueAdditionalSubscriptions
A mapping of required consumer groups to arrays of additional topic subscriptions to be applied on each consumer group’s queue.
These subscriptions may also contain wildcards.Default: Empty
Map<String,String[]>
See: Overview for more info on how this binder uses topic-to-queue mapping to implement Spring Cloud Streams consumer groups.
ℹ️Does not apply when destinationType=queue
.- deliveryMode
See https://docs.solace.com/API/API-Developer-Guide/Message-Delivery-Modes.htm for documentation. The deliveryMode on the producer will be used to send messages on the configured binder. Possible values:
PERSISTENT
DIRECT
If using qualityOfService=
AT_MOST_ONCE
to reduce latency it is suggested to set the deliveryMode toDIRECT
to avoid having the messages persisted on publish.Default:
PERSISTENT
==== Solace Connection Health-Check Properties
These properties configure the Solace connection’s health indicator configurable under
solace.health-check.connection
.
- reconnectAttemptsUntilDown
The number of session reconnect attempts until the health goes
DOWN
. This will happen regardless if the underlying session is actually still reconnecting. Setting this to0
will disable this feature.This feature operates independently of the PubSub+ session reconnect feature. Meaning that if PubSub+ session reconnect is configured to retry less than the value given to this property, then this feature effectively does nothing.
Default:
0
=== Solace Message Headers
Solace-defined Spring headers to get/set Solace metadata from/to Spring
Message
headers.
⚠️ solace_
is a header space reserved for Solace-defined headers. Creating newsolace_
-prefixed headers is not supported. Doing so may cause unexpected side-effects in future versions of this binder.
🔥Refer to each header’s documentation for their expected usage scenario. Using headers outside of their intended type and access-control is not supported.
ℹ️Header inheritance applies to Solace message headers in processor message handlers:
'] __ When the non-void handler method returns, if the return value is already aMessage
, thatMessage
becomes the payload. However, when the return value is not aMessage
, the newMessage
is constructed with the return value as the payload while inheriting headers from the inputMessage
minus the headers defined or filtered bySpringIntegrationProperties.messageHandlerNotPropagatedHeaders
.— 'https://docs.spring.io/spring-cloud-stream/docs/4.1.x/reference/html/spring-cloud-stream.html#_mechanics[Mechanics
Spring Cloud Stream Reference Documentation
==== Solace Headers
These headers are to get/set Solace message properties.
💡
|
Use SolaceHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here. |
Header Name | Type | Access | Description |
---|---|---|---|
|
|
Read/Write |
The message ID (a string for an application-specific message identifier). This is the |
|
|
Read/Write |
The application message type. This is the |
|
|
Read/Write |
The correlation ID. |
|
|
Read |
The number of times the message has been delivered. Note that, while the Delivery Count feature is in controlled availability, |
|
|
Read |
The destination this message was published to. |
|
|
Read |
Whether one or more messages have been discarded prior to the current message. |
|
|
Read/Write |
Whether the message is eligible to be moved to a Dead Message Queue. |
|
|
Read/Write |
The UTC time (in milliseconds, from midnight, January 1, 1970 UTC) when the message is supposed to expire. |
|
|
Read/Write |
The HTTP content encoding header value from interaction with an HTTP client. |
|
|
Read/Write |
Indicates whether this message is a reply. |
|
|
Read/Write |
Priority value in the range of 0–255, or -1 if it is not set. |
|
|
Read |
The receive timestamp (in milliseconds, from midnight, January 1, 1970 UTC). |
|
|
Read |
Indicates if the message has been delivered by the broker to the API before. |
|
|
Read |
Specifies a Replication Group Message ID as a replay start location. |
|
|
Read/Write |
The replyTo destination for the message. |
|
|
Read/Write |
The Sender ID for the message. |
|
|
Read/Write |
The send timestamp (in milliseconds, from midnight, January 1, 1970 UTC). |
|
|
Read/Write |
The sequence number. |
|
|
Read/Write |
The number of milliseconds before the message is discarded or moved to a Dead Message Queue. |
|
|
Read/Write |
When an application sends a message, it can optionally attach application-specific data along with the message, such as user data. |
==== Solace Binder Headers
These headers are to get/set Solace Spring Cloud Stream Binder properties.
These can be used for:
-
Getting/Setting Solace Binder metadata
-
Directive actions for the binder when producing/consuming messages
💡
|
Use SolaceBinderHeaders instead of hardcoding the header names. This class also contains the same documentation that you see here. |
Header Name | Type | Access | Default Value | Description |
---|---|---|---|---|
|
|
Write |
A CorrelationData instance for messaging confirmations. Only work with qualityOfService: AT_LEAST_ONCE (Default) |
|
|
|
Read |
|
A static number set by the publisher to indicate the Spring Cloud Stream Solace message version. |
|
|
Read |
Present and true to indicate when the PubSub+ message payload was null. |
|
|
|
Write |
The partition key for PubSub+ partitioned queues. |
|
|
|
Internal Binder Use Only |
Is |
|
|
|
Internal Binder Use Only |
A JSON String array of header names where each entry indicates that that header’s value was serialized by a Solace Spring Cloud Stream binder before publishing it to a broker. |
|
|
|
Internal Binder Use Only |
|
The encoding algorithm used to encode the headers indicated by |
|
|
Write |
Only applicable when |
|
|
|
Write |
Set to 'true' to enable sending of large messages (only on producer side). Default is 'false'. If using groups only partitioned queues are supported. Otherwhise the message chunks get delivered to the wrong consumer. topic Specifies that the dynamic destination is a topic queue Specifies that the dynamic destination is a queue When absent, the binding’s configured destination-type is used. |
== Native Payload Types
Below are the payload types natively supported by this binder (before/after Content Type Negotiation):
Payload Type | PubSub+ Message Type | Notes |
---|---|---|
|
Binary Message |
Basic PubSub+ payload type. |
|
Text Message |
Basic PubSub+ payload type. |
|
Stream Message |
Basic PubSub+ payload type. |
|
Map Message |
Basic PubSub+ payload type. |
|
XML-Content Message |
Basic PubSub+ payload type. Only available for consumption. |
|
Bytes Message |
This is not a basic payload type supported by the PubSub+ broker, but is one defined and coordinated by this binder. Publishing: When a Consuming: When the binder consumes a binary message which has the |
Typically, the Spring Cloud Stream framework will convert a published payload into a byte[]
before giving it to the binder.
In which case, this binder will publish a binary message.
If this occurs, but you wish to publish other message types, then one option is to set useNativeEncoding=true
on your producer (but read the caveats carefully before enabling this feature), and have your message handler return a payload of one of this binder’s supported native payload types; e.g. return Message<SDTStream>
to publish a stream message.
See Content Type Negotiation for more info on how Spring Cloud Streams converts payloads and other options to control message conversion.
=== Empty Payload VS Null Payload
Spring messages can’t contain null payloads, however, message handlers can differentiate between null payloads and empty payloads by looking at the solace_scst_nullPayload
header.
The binder adds the solace_scst_nullPayload
header when a Solace message with null payload is consumed from the wire.
When that is the case, the binder sets the Spring message’s payload to a null equivalent payload.
Null equivalent payloads are one of the following: empty byte[]
, empty String
, empty SDTMap
, or empty SDTStream
.
ℹ️
|
Applications can’t differentiate between null payloads and empty payloads when consuming binary messages or XML-content messages from the wire. This is because Solace always converts empty payloads to null payloads when those message types are published. |
== Generated Queue Name Syntax
By default, generated consumer group queue names have the following form:
<prefix>/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
- prefix
-
A static prefix
scst
. - familiarity-modifier
-
Indicates the durability of the consumer group (
wk
for well-known oran
for anonymous). - group
-
The consumer
group
name. - destination-encoding
-
Indicates the encoding scheme used to encode the destination in the queue name (currently only
plain
is supported). - encoded-destination
-
The encoded
destination
as per<destination-encoding>
.
The queueNameExpression
property’s default SpEL expression conforms to the above format, however, users can provide any valid SpEL expression in order to generate custom queue names.
Valid expressions evaluate against the following context:
Context Variable | Description |
---|---|
|
The binding’s destination name. |
|
The binding’s consumer group name. |
|
Indicates whether the consumer is an anonymous consumer group |
|
The configured Solace binding properties. |
|
The configured Spring binding properties. |
=== Generated Error Queue Name Syntax
By default, generated error queue names have the following form:
<prefix>/error/<familiarity-modifier>/<group>/<destination-encoding>/<encoded-destination>
The definitions of each segment of the error queue matches that from [Generated Queue Name Syntax], with the following exceptions:
- group
-
The consumer
group
name.
The errorQueueNameExpression
property’s default SpEL expression conforms to the above format.
Users can provide any valid SpEL expression in order to generate custom error queue names using the same evaluation context as described in [Generated Queue Name Syntax].
== Consumer Concurrency
Configure Spring Cloud Stream’s concurrency consumer property to enable concurrent message consumption for a particular consumer binding.
Though note that there are few limitations:
-
concurrency
> 1 is not supported for exclusive queues. -
concurrency
> 1 is not supported for consumer bindings which are a part of anonymous consumer groups. -
concurrency
> 1 is ignored for polled consumers. -
concurrency
> 1 is not supported with auto-provisioned topic endpoints. -
Setting
provisionDurableQueue
tofalse
disables endpoint configuration validation. Meaning that point 1 cannot be validated. In this scenario, it is the developer’s responsibility to ensure that point 1 is followed.
ℹ️
|
The Solace PubSub+ broker supports partitioning natively. Only work with qualityOfService: AT_LEAST_ONCE (Default) The partitioning abstraction as described in the Spring Cloud Stream documentation is not supported. |
To publish messages that are intended for partitioned queues, you must provide a partition key by setting the solace_scst_partitionKey
message header (accessible through the SolaceBinderHeaders.PARTITION_KEY
constant).
For example:
public class MyMessageBuilder {
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(SolaceBinderHeaders.PARTITION_KEY, "partition-key")
.build();
}
}
As for consuming messages from partitioned queues, this is handled transparently by the PubSub+ broker. That is to say, consuming messages from a partitioned queue is no different from consuming messages from any other queue.
See Partitioned Queues for more.
Message handlers can disable auto-acknowledgement and manually invoke the acknowledgement callback as follows:
public void consume(Message<?> message) {
AcknowledgmentCallback acknowledgmentCallback = StaticMessageHeaderAccessor.getAcknowledgmentCallback(message); // (1)
acknowledgmentCallback.noAutoAck(); // (2)
try {
AckUtils.accept(acknowledgmentCallback); // (3)
} catch (SolaceAcknowledgmentException e) {} // (4)
}
-
Get the message’s acknowledgement callback header
-
Disable auto-acknowledgement
-
Acknowledge the message with the
ACCEPT
status -
Handle any acknowledgment exceptions
Refer to the AckUtils documentation and AcknowledgmentCallback documentation for more info on these objects.
💡
|
If manual acknowledgement is to be done outside of the message handler’s thread, then make sure auto-acknowledgement is disabled within the message handler’s thread and not an external one. Otherwise, the binder will auto-acknowledge the message when the message handler returns. |
For each acknowledgement status, the binder will perform the following actions:
Status | Action |
---|---|
ACCEPT |
Acknowledge the message. |
REJECT |
If Refer to Failed Consumer Message Error Handling for more info. |
REQUEUE |
For both, the consumer in a defined consumer group or in an anonymous group, signal the Solace broker to requeue/redeliver the message. The message will be redelivered until it is Refer to Message Redelivery for more info. |
❗
|
Acknowledgements may throw Example: A |
ℹ️
|
Manual acknowledgements do not support any application-internal error handling strategies (i.e. retry template, error channel forwarding, etc). Also, throwing an exception in the message handler will always acknowledge the message in some way regardless if auto-acknowledgment is disabled. |
💡
|
If asynchronously acknowledging messages, then if these messages aren’t acknowledged in a timely manner, it is likely for the message consumption rate to stall due to the consumer queue’s configured "Maximum Delivered Unacknowledged Messages per Flow". This property can be configured for dynamically created queues by using queue templates. However note that as per our documentation, anonymous consumer group queues (i.e. temporary queues) will not match a queue template’s name filter. Only the queue template defined in the client profile’s "Copy Settings From Queue Template" setting will apply to those. |
Spring Cloud Stream has a reserved message header called scst_targetDestination
(retrievable via BinderHeaders.TARGET_DESTINATION
), which allows for messages to be redirected from their bindings' configured destination to the target destination specified by this header.
For this binder’s implementation of this header, the target destination defines the exact Solace topic or queue to which a message will be sent. i.e. No post-processing is done.
This binder also adds a reserved message header called solace_scst_targetDestinationType
(retrievable via SolaceBinderHeaders.TARGET_DESTINATION_TYPE
), which allows to override the configured producer destination-type
.
public class MyMessageBuilder {
public Message<String> buildMeAMessage() {
return MessageBuilder.withPayload("payload")
.setHeader(BinderHeaders.TARGET_DESTINATION, "some-dynamic-destination") // (1)
.setHeader(SolaceBinderHeaders.TARGET_DESTINATION_TYPE, "topic") // (2)
.build();
}
}
-
This message will be sent to the
some-dynamic-destination
topic, ignoring the producer’s configured destination. -
Optionally, the configured producer
destination-type
can be overridden.
ℹ️
|
Those 2 headers are cleared from the message before it is sent off to the message broker. So you should attach that information to your message payload if you want to get that information on the consumer-side. |
ℹ️
|
Dynamic Producer Destinations with StreamBridge
This binder does not support the usage of StreamBridge’s dynamic destination feature, which automatically creates and caches unknown output bindings on-the-fly. Instead, set the public void sendMessage(StreamBridge streamBridge, String myDynamicDestination, Message<?> message) {
Message<?> messageWithDestination = MessageBuilder.fromMessage(message)
.setHeader(BinderHeaders.TARGET_DESTINATION, myDynamicDestination)
.build();
streamBridge.send("some-pre-defined-output-binding", messageWithDestination);
} Then in your application’s configuration file, configure your predefined output binding: spring.cloud.stream.output-bindings=some-pre-defined-output-binding For more info, see Sending arbitrary data to an output (e.g. Foreign event-driven sources). |
The Spring cloud stream framework already provides a number of application-internal reprocessing strategies for failed messages during message consumption. You can read more about that here:
However, after all internal error handling strategies have been exhausted, the Solace implementation of the binder would either:
-
Redeliver the failed message (default)
-
Republish the message to another queue (an error queue) for an external application/binding to process
A simple error handling strategy in which failed messages are redelivered from the consumer group’s queue.
This is very similar to simply enabling the retry template (setting maxAttempts
to a value greater than 1
), but allows for the failed messages to be re-processed by the message broker.
❗
|
The internal implementation of redelivery has changed from Solace Binder v5.0.0. Previously, redelivery was initiated by rebinding consumer flows; however, as of v5.0.0 and later, the Solace API now leverages the Solace broker’s native NACK (Negative Acknowledgement) capabilities. Here is what happens under the hood when this is triggered:
The redelivery may result in message duplication, and the application should be designed to handle this. |
Error Queue Republishing First, it must be noted that an Error Queue is different from a Dead Message Queue (DMQ).
In particular, a DMQ is used to capture re-routed failed messages as a consequence of Solace PubSub+ messaging features such as TTL expiration or exceeding a message’s max redelivery count. Whereas the purpose of an Error Queue is to capture re-routed messages which have been successfully consumed from the message broker, yet cannot be processed by the application.
An Error Queue can be provisioned for a particular consumer group by setting the autoBindErrorQueue
consumer config option to true
.
This Error Queue is simply another durable queue which is named as per the [Generated Error Queue Name Syntax] section.
And like the queues used for consumer groups, its endpoint properties can be configured by means of any consumer properties whose names begin with "errorQueue".
ℹ️
|
Error Queues should not be used with anonymous consumer groups. Since the names of anonymous consumer groups, and in turn the name of their would-be Error Queues, are randomly generated at runtime, it would provide little value to create bindings to these Error Queues because of their unpredictable naming and temporary existence. Also, your environment will be polluted with orphaned Error Queues whenever these consumers rebind. |
When locally reprocessing failed messages with Spring’s Retry Template (i.e. when consumer maxAttempts > 1
), mutations of nested objects within the Spring Message<?>
may persist between retries.
SDTMap
payload and failing the messagepublic Function<Message<SDTMap>, Message<SDTMap>> transform() {
return message -> {
if (!message.getPayload().containsKey("new-key")) { // (1)
message.getPayload().putString("new-key", "value");
}
// failing message processing to trigger retry template
throw new RuntimeException("Failed processing");
};
}
-
Here, this example only invokes this if-statement if the
SDTMap
payload does not contain the key"new-key"
.If the consumer binding was configured with
maxAttempts > 1
, then on the following reprocessing attempts, the payload will still contain the key"new-key"
from the previous attempt.
If this behavior is undesirable, then you should configure your consumers maxAttempts
to 1
and rely on Message Redelivery to handle reprocessing.
The Solace binder supports pausing and resuming consumer bindings. See Spring Cloud Stream documentation to learn how to pause and resume consumer bindings.
ℹ️
|
There is no guarantee that the effect of pausing a binding will be instantaneous: messages already in-flight or being processed by the binder may still be delivered after the call to pause returns. |
By default, asynchronous producer errors aren’t handled by the framework.
Producer error channels can be enabled using the errorChannelEnabled
producer config option.
Beyond that, this binder also supports using a Future
to wait for publish confirmations.
See [Publisher Confirms] for more info.
For each message you can create a new CorrelationData
instance and set it as the value of your message’s SolaceBinderHeaders.CONFIRM_CORRELATION
header.
ℹ️
|
CorrelationData can be extended to add more correlation info.
The SolaceBinderHeaders.CONFIRM_CORRELATION header is not reflected in the actual message published to the broker.
|
Now using CorrelationData.getFuture().get()
, you can wait for a publish acknowledgment from the broker.
If the publish failed, then this future will throw an exception.
For example:
@Autowired
private StreamBridge streamBridge;
public void send(String payload, long timeout, TimeUnit unit) {
CorrelationData correlationData = new CorrelationData();
Message<SensorReading> message = MessageBuilder.withPayload(payload)
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData)
.build();
streamBridge.send("output-destination", message);
try {
correlationData.getFuture().get(timeout, unit);
// Do success logic
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// Do failure logic
}
}
== Solace Binder Health Indicator
Solace binders can report health statuses via the Spring Boot Actuator health endpoint.
To enable this feature, add Spring Boot Actuator to the classpath.
To manually disable this feature, set management.health.binders.enabled=false
.
Health Status | Description |
---|---|
UP |
Status indicating that the binder is functioning as expected. |
RECONNECTING |
Status indicating that the binder is actively trying to reconnect to the message broker. This is a custom health status. It isn’t included in the health severity order list ( |
DOWN |
Status indicating that the binder has suffered an unexpected failure. For instance, the binder may have exhausted all reconnection attempts. User intervention is likely required. |
== Solace Binder Metrics
Leveraging Spring Metrics, the Solace PubSub+ binder exposes the following metrics:
Name | Type | Tags | Description |
---|---|---|---|
|
Base Units: |
|
Message payload size. This is the payload size of the messages received (if |
|
Base Units: |
|
Total message size. This is the total size of the messages received (if |
== Micometer Tracing
The binder supports micrometer tracing. To enable ensure the needed Beans are available: Tracer and Propagator.
== Resources
For more information about Spring Cloud Streams try these resources:
For more information about Solace technology in general please visit these resources:
-
The Solace Developer Portal website at: https://solace.dev
-
Ask the Solace community