1. 程式人生 > >【本人禿頂程式設計師】Spring Boot整合Java DSL

【本人禿頂程式設計師】Spring Boot整合Java DSL

←←←←←←←←←←←← 快,點關注!

Spring Integration Java DSL已經融合到Spring Integration Core 5.0,這是一個聰明而明顯的舉動,因為:

  • 基於Java Config啟動新Spring專案的每個人都使用它
  • SI Java DSL使您可以使用Lambdas等新的強大Java 8功能
  • 您可以使用 基於IntegrationFlowBuilder的Builder模式構建流

讓我們看看基於ActiveMQ JMS的示例如何使用它。

Maven依賴:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-jms</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-kahadb-store</artifactId>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-java-dsl</artifactId>
        <version>1.2.3.RELEASE</version>
    </dependency>
</dependencies>

歡迎大家加入粉絲群:963944895,群內免費分享Spring框架、Mybatis框架SpringBoot框架、SpringMVC框架、SpringCloud微服務、Dubbo框架、Redis快取、RabbitMq訊息、JVM調優、Tomcat容器、MySQL資料庫教學視訊及架構學習思維導圖

示例1:Jms入站閘道器

我們有以下ServiceActivator:

@Service
public class ActiveMQEndpoint {
    @ServiceActivator(inputChannel = "inboundChannel")
    public void processMessage(final String inboundPayload) {
        System.out.println("Inbound message: "+inboundPayload);
    }
}

如果您想使用SI Java DSL 將inboundPayload從Jms佇列傳送到Gateway風格的啟用器,那麼請使用DSL Jms工廠:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
    return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
    return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsInboundGateway dataEndpoint() {
    return Jms.inboundGateway(listenerContainer())
            .requestChannel(inboundChannel()).get();
}

通過dataEndpoint bean 返回JmsInboundGatewaySpec,您還可以向SI通道或Jms目標傳送回覆。檢視文件。

示例2:Jms訊息驅動的通道介面卡

如果您正在尋找替換訊息驅動通道介面卡的XML JMS配置,那麼JmsMessageDrivenChannelAdapter是一種適合您的方式:

@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
    return new DynamicDestinationResolver();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory();
}

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
    defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
    return defaultMessageListenerContainer;
}

@Bean
public MessageChannel inboundChannel() {
    return MessageChannels.direct("inboundChannel").get();
}

@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
    final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            new ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setExpectReply(false);
    final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
            );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    return messageDrivenChannelAdapter;
}

與前面的示例一樣,入站有效負載如樣本1中一樣傳送給啟用器。

示例3:使用JAXB的Jms訊息驅動的通道介面卡

在典型的場景中,您希望通過Jms接受XML作為文字訊息,將其轉換為JAXB存根並在服務啟用器中處理它。我將向您展示如何使用SI Java DSL執行此操作,但首先讓我們為xml處理新增兩個依賴項:

<dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-xml</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-oxm</artifactId>
    </dependency>

我們將通過JMS接受shiporders ,所以首先XSD命名為shiporder.xsd:

<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">

    <xs:element name="shiporder">
        <xs:complexType>
            <xs:sequence>
                <xs:element name="orderperson" type="xs:string"/>
                <xs:element name="shipto">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="name" type="xs:string"/>
                            <xs:element name="address" type="xs:string"/>
                            <xs:element name="city" type="xs:string"/>
                            <xs:element name="country" type="xs:string"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
                <xs:element name="item" maxOccurs="unbounded">
                    <xs:complexType>
                        <xs:sequence>
                            <xs:element name="title" type="xs:string"/>
                            <xs:element name="note" type="xs:string" minOccurs="0"/>
                            <xs:element name="quantity" type="xs:positiveInteger"/>
                            <xs:element name="price" type="xs:decimal"/>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
            </xs:sequence>
            <xs:attribute name="orderid" type="xs:string" use="required"/>
        </xs:complexType>
    </xs:element>

</xs:schema>

新增JAXB maven plugin 生成JAXB存根:

  <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>jaxb2-maven-plugin</artifactId>
            <version>2.3.1</version>
            <executions>
                <execution>
                    <id>xjc-schema1</id>
                    <goals>
                        <goal>xjc</goal>
                    </goals>
                    <configuration>
                        <!-- Use all XSDs under the west directory for sources here. -->
                        <sources>
                            <source>src/main/resources/xsds/shiporder.xsd</source>
                        </sources>

                        <!-- Package name of the generated sources. -->
                        <packageName>com.example.stubs</packageName>
                        <outputDirectory>src/main/java</outputDirectory>
                        <clearOutputDir>false</clearOutputDir>
                    </configuration>
                </execution>
            </executions>
        </plugin>

我們已經準備好了存根類和一切,現在使用Jaxb magic的Java DSL JMS訊息驅動介面卡:

/**
 * Sample 3: Jms message driven adapter with JAXB
 */
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
    final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            new ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setExpectReply(false);
    channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()));
    final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
    );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    return messageDrivenChannelAdapter;
}

@Bean
public Jaxb2Marshaller shipOrdersMarshaller() {
    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setContextPath("com.example.stubs");
    return marshaller;
}

XML配置在Java中使用它可以為您提供如此強大的功能和靈活性。要完成此示例,inboundChannel的服務啟用器將如下所示:

/**
 * Sample 3
 * @param shiporder
 */
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
    System.out.println(shiporder.getOrderid());
    System.out.println(shiporder.getOrderperson());
}

要測試流,您可以使用以下XML通過JConsole傳送到JMS佇列:

 <?xml version="1.0" encoding="UTF-8"?>        
    <shiporder orderid="889923"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="shiporder.xsd">
      <orderperson>John Smith</orderperson>
        <shipto>
          <name>Ola Nordmann</name>
          <address>Langgt 23</address>
          <city>4000 Stavanger</city>
          <country>Norway</country>
        </shipto>
        <item>
          <title>Empire Burlesque</title>
          <note>Special Edition</note>
          <quantity>1</quantity>
          <price>10.90</price>
          </item>
        <item>
          <title>Hide your heart</title>
          <quantity>1</quantity>
          <price>9.90</price>
        </item>
    </shiporder>

有關如何使用ActiveMQ和JConsole的快速概述,請檢視本教程

示例4:具有JAXB和有效負載根路由的Jms訊息驅動的通道介面卡

另一種典型情況是接受XML作為JMS文字訊息,將其轉換為JAXB存根並根據有效負載根型別將有效負載路由到某個服務啟用器。當然SI Java DSL支援所有型別的路由,我將向您展示如何根據有效載荷型別進行路由。

首先,將以下XSD新增到shiporder.xsd所在的資料夾中,並將其命名為purchaseorder.xsd:

<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
            xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd"
            targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd"
            elementFormDefault="qualified">
    <xsd:element name="PurchaseOrder">
        <xsd:complexType>
            <xsd:sequence>
                <xsd:element name="ShipTo" type="tns:USAddress" maxOccurs="2"/>
                <xsd:element name="BillTo" type="tns:USAddress"/>
            </xsd:sequence>
            <xsd:attribute name="OrderDate" type="xsd:date"/>
        </xsd:complexType>
    </xsd:element>

    <xsd:complexType name="USAddress">
        <xsd:sequence>
            <xsd:element name="name"   type="xsd:string"/>
            <xsd:element name="street" type="xsd:string"/>
            <xsd:element name="city"   type="xsd:string"/>
            <xsd:element name="state"  type="xsd:string"/>
            <xsd:element name="zip"    type="xsd:integer"/>
        </xsd:sequence>
        <xsd:attribute name="country" type="xsd:NMTOKEN" fixed="US"/>
    </xsd:complexType>
</xsd:schema>

然後新增到jaxb maven外掛配置:

 <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>jaxb2-maven-plugin</artifactId>
            <version>2.3.1</version>
            <executions>
                <execution>
                    <id>xjc-schema1</id>
                    <goals>
                        <goal>xjc</goal>
                    </goals>
                    <configuration>
                        <!-- Use all XSDs under the west directory for sources here. -->
                        <sources>
                            <source>src/main/resources/xsds/shiporder.xsd</source>
                            <source>src/main/resources/xsds/purchaseorder.xsd</source>
                        </sources>

                        <!-- Package name of the generated sources. -->
                        <packageName>com.example.stubs</packageName>
                        <outputDirectory>src/main/java</outputDirectory>
                        <clearOutputDir>false</clearOutputDir>
                    </configuration>
                </execution>
            </executions>
        </plugin>

執行mvn clean install以生成新XSD的JAXB存根。現在承諾有效負載根對映:

@Bean
public Jaxb2Marshaller ordersMarshaller() {
    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setContextPath("com.example.stubs");
    return marshaller;
}

/**
 * Sample 4: Jms message driven adapter with Jaxb and Payload routing.
 * @return
 */
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
    final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
            new ChannelPublishingJmsMessageListener();
    channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller()));
    final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
            JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
    );

    messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
    return messageDrivenChannelAdapter;
}

@Bean
public IntegrationFlow payloadRootMapping() {
    return IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m
            .subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> {
                final Shiporder shiporder = (Shiporder) message.getPayload();
                System.out.println(shiporder.getOrderperson());
                System.out.println(shiporder.getOrderid());
            }))
            .subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> {
                final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();
                System.out.println(purchaseOrderType.getBillTo().getName());
            }))
    ).get();
}

注意payloadRootMapping bean,讓我們解釋一下重要的部分:

  • <Object, Class<?>> route - 表示來自inboundChannel的輸入將是Object,並且將根據Class
    <?>執行路由
  • subFlowMapping(Shiporder.class… - ShipOders的處理。
  • subFlowMapping(PurchaseOrder.class … - 處理PurchaseOrders。

要測試ShipOrder有效負載,請使用示例3中的XML,以測試PurchaseOrder有效負載,使用以下XML:

<?xml version="1.0" encoding="utf-8"?>  
<PurchaseOrder OrderDate="1900-01-01" xmlns="http://tempuri.org/PurchaseOrderSchema.xsd">  
  <ShipTo country="US">  
    <name>name1</name>  
    <street>street1</street>  
    <city>city1</city>  
    <state>state1</state>  
    <zip>1</zip>  
  </ShipTo>  
  <ShipTo country="US">  
    <name>name2</name>  
    <street>street2</street>  
    <city>city2</city>  
    <state>state2</state>  
    <zip>-79228162514264337593543950335</zip>  
  </ShipTo>  
  <BillTo country="US">  
    <name>name1</name>  
    <street>street1</street>  
    <city>city1</city>  
    <state>state1</state>  
    <zip>1</zip>  
  </BillTo>  
</PurchaseOrder>

應根據subflow 子流Map路由兩個有效載荷。

示例5:IntegrationFlowAdapter

除了企業整合模式的其他實現(check them out)),我需要提到IntegrationFlowAdapter。通過擴充套件此類並實現buildFlow方法,如:

[url=https://bitbucket.org/Component/]@Component[/url] 
public class MyFlowAdapter extends IntegrationFlowAdapter {

@Autowired
 private ConnectionFactory rabbitConnectionFactory;

 @Override
 protected IntegrationFlowDefinition<?> buildFlow() {
      return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue"))
               .<String, String>transform(String::toLowerCase)
               .channel(c -> c.queue("myFlowAdapterOutput"));
 }

你可以將bean的重複宣告包裝成一個元件並給它們所需的流量。然後可以配置這樣的元件並將其作為一個類例項提供給呼叫程式碼!

因此,讓我們舉例說明這個repo中的示例3更短一些,併為所有JmsEndpoints定義基類,並在其中定義重複bean:

public class JmsEndpoint extends IntegrationFlowAdapter {

    private String queueName;

    private String channelName;

    private String contextPath;

    /**
     * @param queueName
     * @param channelName
     * @param contextPath
     */
    public JmsEndpoint(String queueName, String channelName, String contextPath) {
        this.queueName = queueName;
        this.channelName = channelName;
        this.contextPath = contextPath;
    }

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(Jms.messageDrivenChannelAdapter(listenerContainer())
            .jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()))
        ).channel(channelName);
    }

    @Bean
    public Jaxb2Marshaller shipOrdersMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setContextPath(contextPath);
        return marshaller;
    }

    @Bean
    public DynamicDestinationResolver dynamicDestinationResolver() {
        return new DynamicDestinationResolver();
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory();
    }

    @Bean
    public DefaultMessageListenerContainer listenerContainer() {
        final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
        defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
        defaultMessageListenerContainer.setDestinationName(queueName);
        return defaultMessageListenerContainer;
    }

    @Bean
    public MessageChannel inboundChannel() {
        return MessageChannels.direct(channelName).get();
    }
}

現在宣告特定佇列的Jms端點很容易:

@Bean
public JmsEndpoint jmsEndpoint() {
    return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs");
}

inboundChannel的服務啟用器:

/**
 * Sample 3, 5
 * @param shiporder
 */
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
    System.out.println(shiporder.getOrderid());
    System.out.println(shiporder.getOrderperson());
}

您不應該錯過在專案中使用IntegrationFlowAdapter。我喜歡它的概念。

我最近在Embedit的新的基於Spring Boot的專案中開始使用Spring Integration Java DSL 。即使有一些配置,我發現它非常有用。

  • 它很容易除錯。不新增像wiretap這樣的配置。
  • 閱讀起來要容易得多。是的,即使是lambdas!
  • 它很強大。在Java配置中,您現在有很多選擇。