有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter 将为分割并处理这些消息。
Splitter 在很多情况下都很有用,但是有两个基本用例可以使用 Splitter:
- 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
- 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,Splitter 后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。
当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个 POJO 即可,该 POJO 提取传入的有效负载的各个部分,并将它们作为集合的元素返回。
例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的 OrderSplitter 将完成这项工作:
public class OrderSplitter {
public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
ArrayList<Object> parts = new ArrayList<>();
parts.add(po.getBillingInfo());
parts.add(po.getLineItems());
return parts;
}
}
然后,可以使用 @Splitter 注解将 OrderSplitter bean 声明为集成流的一部分,如下所示:
@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
return new OrderSplitter();
}
在这里,购买订单到达名为 poChannel 的通道,并被 OrderSplitter 分割。然后,将返回集合中的每个项作为集成流中的单独消息发布到名为 splitOrderChannel 的通道。在流的这一点上,可以声明一个 PayloadTypeRouter 来将账单信息和项目,并路由到它们自己的子流:
@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(
BillingInfo.class.getName(), "billingInfoChannel");
router.setChannelMapping(List.class.getName(), "lineItemsChannel");
return router;
}
顾名思义,PayloadTypeRouter 根据消息的有效负载类型将消息路由到不同的通道。按照这里的配置,将有效负载为类型为 BillingInfo 的消息路由到一个名为 billingInfoChannel 的通道进行进一步处理。至于项目信息,它们在 java.util.List 集合包中;因此,可以将 List 类型的有效负载映射到名为 lineItemsChannel 的通道中。
按照目前的情况,流分为两个子流:一个是 BillingInfo 对象流,另一个是 List<LineItem> 流。但是,如果想进一步分割它,而不是处理 LineItem 列表,而是分别处理每个 LineItem,该怎么办呢?要将列表拆分为多个消息(每个行项对应一条消息),只需编写一个方法(而不是 bean),该方法使用 @Splitter 进行注解,并返回 LineItems 集合,可能类似如下:
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
return lineItems;
}
当携带 List<LineItem> 的有效负载的消息到达名为 lineItemsChannel 的通道时,它将传递到 lineItemSplitter() 方法。根据 Splitter 的规则,该方法必须返回要 Splitter 的项的集合。在本例中,已经有了 LineItems 的集合,因此只需直接返回该集合。因此,集合中的每个 LineItem 都以其自己的消息形式发布到名为 lineItemChannel 的通道。
如果你想使用 Java DSL 来声明相同的 Splitter/Router 配置,你可以调用 split() 和 route():
return IntegrationFlows
...
.split(orderSplitter())
.<Object, String> route(p -> {
if (p.getClass().isAssignableFrom(BillingInfo.class)) {
return "BILLING_INFO";
} else {
return "LINE_ITEMS";
}
}, mapping ->
mapping.subFlowMapping("BILLING_INFO", sf ->
sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
.subFlowMapping("LINE_ITEMS", sf ->
sf.split().<LineItem> handle((lineItem, h) -> { ... }))
)
.get();
流定义的 DSL 形式当然更简洁,如果不是更难于理解的话。它使用与 Java 配置示例相同的 OrderSplitter 来分割订单。在订单被分割之后,它被其类型路由到两个单独的子流。