1. 程式人生 > >SpringXD 自定義Job模組開發

SpringXD 自定義Job模組開發

SpringXD中的Job實際即為Spring Batch中的Job,因此我們先按照Spring Batch的規範開發一個簡單的Job。

專案依賴:

<dependencies>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-infrastructure</artifactId>
        </dependency>
        <dependency>
<groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId>
</dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>
slf4j-api</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </dependency> </dependencies>

實際這裡無需配置spring-batch的依賴,因為會在springxd的parent pom中宣告。而springxd的parent一般我們都會宣告的。即:

    <groupId>org.springframework.xd</groupId>
        <artifactId>spring-xd-module-parent</artifactId>
        <!-- 1.1.x or later -->
        <version>1.3.1.RELEASE</version>
    </parent>

Job開發:

package cn.rongcapital.springxd.job;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author li.hzh
 * @date 2016-11-01 14:03
 */
@Configuration
public class HelloWorldJob {

    private static Logger logger = LoggerFactory.getLogger(HelloWorldJob.class);

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean(name = "helloworldJob")
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1() {
        return steps.get("step1").tasklet(new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                logger.info("Step One");
                return RepeatStatus.FINISHED;
            }
        }).build();
    }

    @Bean
    protected Step step2() {
        return steps.get("step2")
                .tasklet(new Tasklet() {
                    @Override
                    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                        logger.info("Step Two");
                        return RepeatStatus.FINISHED;
                    }
                })
                .build();
    }
}

簡單的不能再簡單了。就是兩個步驟,一個列印Step One Hello,一個列印Step Two World。本地執行確認可以正常執行。

配置SpringXD配置檔案 即使是使用JavaConfig的方式開發,也需要配置一個properties檔案,宣告jobClass的base_package,例如:

base_packages=org.springframework.springxd.samples.batch

究其原因,可參見原始碼:

/**
      * Create a simple module based on the provided {@link ModuleDescriptor}, {@link ModuleOptions}, and {@link ModuleDeploymentProperties}.
      *
      * @param moduleDescriptor descriptor for the composed module
      * @param moduleOptions module options for the composed module
      * @param deploymentProperties deployment related properties for the composed module
      * @return new simple module instance
      */
     private Module createSimpleModule(ModuleDescriptor moduleDescriptor, ModuleOptions moduleOptions,
               ModuleDeploymentProperties deploymentProperties) {
          if (log.isInfoEnabled()) {
               log.info("creating simple module " + moduleDescriptor);
          }
          SimpleModuleDefinition definition = (SimpleModuleDefinition) moduleDescriptor.getModuleDefinition();
          ClassLoader moduleClassLoader = ModuleUtils.createModuleRuntimeClassLoader(definition, moduleOptions, this.parentClassLoader);

          Class<? extends SimpleModule> moduleClass = determineModuleClass((SimpleModuleDefinition) moduleDescriptor.getModuleDefinition(),
                    moduleOptions);
          Assert.notNull(moduleClass,
                    String.format("Required module artifacts are either missing or invalid. Unable to determine module type for module definition: '%s:%s'.",
                              moduleDescriptor.getType(), moduleDescriptor.getModuleName()));
          return SimpleModuleCreator
                    .createModule(moduleDescriptor, deploymentProperties, moduleClassLoader, moduleOptions, moduleClass);
     }

在createSimpleModule方法中需要獲取moduleClass,取不到會報錯。而獲取的方式是,

private Class<? extends SimpleModule> determineModuleClass(SimpleModuleDefinition moduleDefinition,
      ModuleOptions moduleOptions) {
   String name = (String) moduleOptions.asPropertySource().getProperty(MODULE_EXECUTION_FRAMEWORK_KEY);
   if ("spark".equals(name)) {
      return NonBindingResourceConfiguredModule.class;
   }
   else if (ModuleUtils.resourceBasedConfigurationFile(moduleDefinition) != null) {
      return ResourceConfiguredModule.class;
   }
   else if (JavaConfiguredModule.basePackages(moduleDefinition).length > 0) {
      return JavaConfiguredModule.class;
   }
   return null;
}

最後一個else if裡,可見JavaConfiguredModule會需要查詢basePackages屬性的。

public static String[] basePackages(SimpleModuleDefinition moduleDefinition) {

   Properties properties = ModuleUtils.loadModuleProperties(moduleDefinition);
   //Assert.notNull(propertiesFile, "required module properties not found.");
   if (properties == null) {
      return new String[0];
   }


   String basePackageNames = properties.getProperty(BASE_PACKAGES);
   return StringUtils.commaDelimitedListToStringArray(basePackageNames);
}

而這個屬性是從properties配置檔案中查詢的

/**
 * Return a resource that can be used to load the module '.properties' file (containing <i>e.g.</i> information
 * about module options, or null if no such file exists.
 */
public static Resource modulePropertiesFile(SimpleModuleDefinition definition) {
   return ModuleUtils.locateModuleResource(definition, ".properties");
}

進入打包SpringXD Module的環節。

打包SpringXD Module pom配置

<parent>
        <groupId>org.springframework.xd</groupId>
        <artifactId>spring-xd-module-parent</artifactId>
        <!-- 1.1.x or later -->
        <version>1.3.1.RELEASE</version>
    </parent>

需要制定springxd的parent裡,因為其中配置了maven build所需的外掛。通過命令打包:

mvn package

然後上傳到SpringXD,進入xd-shell

bin/xd-shell
xd:>module upload --type job --name helloworld --file /data/dps-springxd-job-helloworld-1.0.0.BUILD-SNAPSHOT.jar
Successfully uploaded module 'job:helloworld'

執行Job

xd:>job create --name helloworldJob --definition "helloworld" --deploy
Successfully created and deployed job 'helloworldJob'
xd:>job launch helloworldJob
Successfully submitted launch request for job 'helloworldJob'

注:在筆者的分散式環境中,通過module upload後,沒有自動分發到container節點上,臨時通過手動拷貝完成。此問題待排查。