Skip to content

Latest commit

 

History

History
84 lines (66 loc) · 4.79 KB

File metadata and controls

84 lines (66 loc) · 4.79 KB

9.2.5 分割器

有时,在集成流中,将消息拆分为多个独立处理的消息可能很有用。Splitter 将为分割并处理这些消息。

Splitter 在很多情况下都很有用,但是有两个基本用例可以使用 Splitter:

  • 消息有效载荷,包含单个消息有效载荷相同类型的项的集合。例如,携带产品列表的消息可能被分成多个消息,每个消息的有效负载是一个产品。
  • 信息有效载荷,携带的信息虽然相关,但可以分为两种或两种以上不同类型的信息。例如,购买订单可能包含交付、帐单和行项目信息。交付细节可能由一个子流程处理,账单由另一个子流程处理,每一项则由另一个子流程处理。在这个用例中,Splitter 后面通常跟着一个路由器,它根据有效负载类型路由消息,以确保正确的子流处理数据。

当将消息有效负载拆分为两个或多个不同类型的消息时,通常只需定义一个 POJO 即可,该 POJO 提取传入的有效负载的各个部分,并将它们作为集合的元素返回。

例如,假设希望将携带购买订单的消息拆分为两条消息:一条携带账单信息,另一条携带项目列表。下面的 OrderSplitter 将完成这项工作:

public class OrderSplitter {
    public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
        ArrayList<Object> parts = new ArrayList<>();
        parts.add(po.getBillingInfo());
        parts.add(po.getLineItems());
        return parts;
    }
}

然后,可以使用 @Splitter 注解将 OrderSplitter bean 声明为集成流的一部分,如下所示:

@Bean
@Splitter(inputChannel="poChannel", outputChannel="splitOrderChannel")
public OrderSplitter orderSplitter() {
    return new OrderSplitter();
}

在这里,购买订单到达名为 poChannel 的通道,并被 OrderSplitter 分割。然后,将返回集合中的每个项作为集成流中的单独消息发布到名为 splitOrderChannel 的通道。在流的这一点上,可以声明一个 PayloadTypeRouter 来将账单信息和项目,并路由到它们自己的子流:

@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(
        BillingInfo.class.getName(), "billingInfoChannel");
    router.setChannelMapping(List.class.getName(), "lineItemsChannel");
    return router;
}

顾名思义,PayloadTypeRouter 根据消息的有效负载类型将消息路由到不同的通道。按照这里的配置,将有效负载为类型为 BillingInfo 的消息路由到一个名为 billingInfoChannel 的通道进行进一步处理。至于项目信息,它们在 java.util.List 集合包中;因此,可以将 List 类型的有效负载映射到名为 lineItemsChannel 的通道中。

按照目前的情况,流分为两个子流:一个是 BillingInfo 对象流,另一个是 List<LineItem> 流。但是,如果想进一步分割它,而不是处理 LineItem 列表,而是分别处理每个 LineItem,该怎么办呢?要将列表拆分为多个消息(每个行项对应一条消息),只需编写一个方法(而不是 bean),该方法使用 @Splitter 进行注解,并返回 LineItems 集合,可能类似如下:

@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
    return lineItems;
}

当携带 List<LineItem> 的有效负载的消息到达名为 lineItemsChannel 的通道时,它将传递到 lineItemSplitter() 方法。根据 Splitter 的规则,该方法必须返回要 Splitter 的项的集合。在本例中,已经有了 LineItems 的集合,因此只需直接返回该集合。因此,集合中的每个 LineItem 都以其自己的消息形式发布到名为 lineItemChannel 的通道。

如果你想使用 Java DSL 来声明相同的 Splitter/Router 配置,你可以调用 split() 和 route():

return IntegrationFlows
    ...
    .split(orderSplitter())
    .<Object, String> route(p -> {
        if (p.getClass().isAssignableFrom(BillingInfo.class)) {
            return "BILLING_INFO";
        } else {
            return "LINE_ITEMS";
        }
    }, mapping ->
           mapping.subFlowMapping("BILLING_INFO", sf -> 
                      sf.<BillingInfo> handle((billingInfo, h) -> { ... }))
                  .subFlowMapping("LINE_ITEMS", sf -> 
                       sf.split().<LineItem> handle((lineItem, h) -> { ... }))
    )
    .get();

流定义的 DSL 形式当然更简洁,如果不是更难于理解的话。它使用与 Java 配置示例相同的 OrderSplitter 来分割订单。在订单被分割之后,它被其类型路由到两个单独的子流。