spring integration spring batch -凯发k8网页登录

spring integration spring batch

spring integration的强项是事件驱动,但捕获之后,要做的事是触发一个类的方法,对于要处理大数据量的文件,就没有办法了,如读取1w条记录,然后插入数据库。而这个正是spring batch的强项所在,因此有必要将此两个框架整合起来用。

场景:盯着一个文件夹,如果一有文件,此文件可能非常大的,则启动一个batch job来处理。


文件拉取器,监控文件夹一有新文件,则将此文件包装成message,丢到下一个通道中:
<file:inbound-channel-adapter id="filepoller"
                              channel
="filesarecoming" 
                              directory
="file:${input.directory}"
                              filename-pattern
="test*" />


filesarecoming通道的serviceactivator
public joblaunchrequest adapt(file file) throws nosuchjobexception {

    jobparameters jobparameters = new jobparametersbuilder().addstring(
            "input.file", file.getabsolutepath()).tojobparameters();

    return new joblaunchrequest(job, jobparameters);
}


joblauncher通道的serviceactivator
<service-activator input-channel="joblauncher">
    <beans:bean class="org.springframework.batch.integration.launch.joblaunchingmessagehandler">
        <beans:constructor-arg ref="joblauncher" />
    beans:bean>
service-activator>


"file.input"依赖于执行期的取值
<bean id="reader" class="org.springframework.batch.item.file.flatfileitemreader" scope="step">
    <property name="resource" value="#{jobparameters[input.file]}" />
     line mapper and other props
bean>


参考的spring xml
xml version="1.0" encoding="utf-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
    xmlns:beans
="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:file
="http://www.springframework.org/schema/integration/file"
    xsi:schemalocation
="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
        http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"
>

    <channel id="filechannel"/>
    <channel id="joblaunchrequestchannel"/>
    <channel id="jobexecutionchannel"/>
    <logging-channel-adapter channel="jobexecutionchannel" />

    <file:inbound-channel-adapter directory="/users/paul/documents/paul/download/software/develop/spring batch/spring-batch-2.1.9.release/samples/spring-batch-simple-cli/file"
        channel
="filechannel" filename-pattern="t*.xml" comparator="filecreationtimecomparator">
        <poller max-messages-per-poll="1" cron="0/1 * * * * *" />
    file:inbound-channel-adapter>
    
    <service-activator input-channel="filechannel"
                       output-channel
="joblaunchrequestchannel"
                       ref
="filetojoblaunchrequestadapter"
                       method
="adapt"/>
                       
    <service-activator input-channel="joblaunchrequestchannel" output-channel="jobexecutionchannel">
        <beans:bean class="org.springframework.batch.integration.launch.joblaunchingmessagehandler">
            <beans:constructor-arg ref="joblauncher" />
        beans:bean>
    service-activator>

    <beans:bean id="filetojoblaunchrequestadapter" class="example.filetojoblaunchrequestadapter">
        <beans:property name="job" ref="helloworldjob"/>
    beans:bean>
    
    
    <beans:bean id="filecreationtimecomparator" class="com.paul.integration.file.filters.comparator.filecreationtimecomparator">
    beans:bean>

beans:beans>

spring batch job的配置文件
xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi
="http://www.w3.org/2001/xmlschema-instance"
    xmlns:batch
="http://www.springframework.org/schema/batch"
    xmlns:util
="http://www.springframework.org/schema/util"
    xsi:schemalocation
="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd"
>

    

    <batch:job id="helloworldjob">
        <batch:step id="step1" next="xmlfilereadandwriterstep">
            <batch:tasklet ref="helloworldtasklet">batch:tasklet>
        batch:step>
        <batch:step id="xmlfilereadandwriterstep">
             <batch:tasklet>
                 <batch:chunk reader="xmlreader" writer="xmlwriter" processor="xmlprocessor"
                     commit-interval
="10">
                 batch:chunk>
             batch:tasklet>
         batch:step>
    batch:job>
    
    <bean id="helloworldtasklet" class="example.helloworldtasklet">bean>
    
    
     <bean id="xmlreader"
         class
="org.springframework.batch.item.xml.staxeventitemreader" scope="step">
         <property name="fragmentrootelementname" value="trade" />
         <property name="unmarshaller" ref="trademarshaller" />
         <property name="resource" value="#{jobparameters['input.file.path']}" />
     bean>
 
     <bean id="xmlprocessor" class="com.paul.batch.xmlprocessor" />
 
     
    <bean id="xmlwriter" class="org.springframework.batch.item.xml.staxeventitemwriter"
        scope
="step">
        <property name="roottagname" value="wanggc" />
        <property name="marshaller" ref="trademarshaller" />
        <property name="resource" value="#{jobparameters['output.file.path']}" />
    bean>

    <bean id="trademarshaller" class="org.springframework.oxm.xstream.xstreammarshaller">
        <property name="aliases">
            <util:map id="aliases">
                <entry key="trade" value="com.paul.domain.trade" />
                <entry key="price" value="java.math.bigdecimal" />
                <entry key="name" value="java.lang.string" />
            util:map>
        property>
    bean>
    
beans>


文件处理器
package com.paul.batch;

import org.springframework.batch.item.itemprocessor;

import com.paul.domain.trade;

public class xmlprocessor implements itemprocessor {

    /**
     * xml文件内容处理。
     * 
     
*/
    @override
    public trade process(trade trade) throws exception {
        return trade;
    }
}


domain trade对象
/*
 * 凯发天生赢家一触即发官网 copyright 2006-2007 the original author or authors.
 *
 * licensed under the apache license, version 2.0 (the "license");
 * you may not use this file except in compliance with the license.
 * you may obtain a copy of the license at
 *
 *      
http://www.apache.org/licenses/license-2.0
 *
 * unless required by applicable law or agreed to in writing, software
 * distributed under the license is distributed on an "as is" basis,
 * without warranties or conditions of any kind, either express or implied.
 * see the license for the specific language governing permissions and
 * limitations under the license.
 
*/

package com.paul.domain;

import java.io.serializable;
import java.math.bigdecimal;


/**
 * 
@author rob harrop
 * 
@author dave syer
 
*/
public class trade implements serializable {
    private string isin = "";
    private long quantity = 0;
    private bigdecimal price = new bigdecimal(0);
    private string customer = "";
    private long id;
    private long version = 0;

    public trade() {
    }
    
    public trade(string isin, long quantity, bigdecimal price, string customer){
        this.isin = isin;
        this.quantity = quantity;
        this.price = price;
        this.customer = customer;
    }

    /**
     * 
@param id
     
*/
    public trade(long id) {
        this.id = id;
    }
    
    public long getid() {
        return id;
    }
    
    public void setid(long id) {
        this.id = id;
    }

    public long getversion() {
        return version;
    }

    public void setversion(long version) {
        this.version = version;
    }

    public void setcustomer(string customer) {
        this.customer = customer;
    }

    public void setisin(string isin) {
        this.isin = isin;
    }

    public void setprice(bigdecimal price) {
        this.price = price;
    }

    public void setquantity(long quantity) {
        this.quantity = quantity;
    }

    public string getisin() {
        return isin;
    }

    public bigdecimal getprice() {
        return price;
    }

    public long getquantity() {
        return quantity;
    }

    public string getcustomer() {
        return customer;
    }

    public string tostring() {
        return "trade: [isin="   this.isin   ",quantity="   this.quantity   ",price="
              this.price   ",customer="   this.customer   "]";
    }

    @override
    public int hashcode() {
        final int prime = 31;
        int result = 1;
        result = prime * result   ((customer == null) ? 0 : customer.hashcode());
        result = prime * result   ((isin == null) ? 0 : isin.hashcode());
        result = prime * result   ((price == null) ? 0 : price.hashcode());
        result = prime * result   (int) (quantity ^ (quantity >>> 32));
        result = prime * result   (int) (version ^ (version >>> 32));
        return result;
    }

    @override
    public boolean equals(object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getclass() != obj.getclass())
            return false;
        trade other = (trade) obj;
        if (customer == null) {
            if (other.customer != null)
                return false;
        }
        else if (!customer.equals(other.customer))
            return false;
        if (isin == null) {
            if (other.isin != null)
                return false;
        }
        else if (!isin.equals(other.isin))
            return false;
        if (price == null) {
            if (other.price != null)
                return false;
        }
        else if (!price.equals(other.price))
            return false;
        if (quantity != other.quantity)
            return false;
        if (version != other.version)
            return false;
        return true;
    }
    
 }


从文件夹取出文件列表后,进行按修改时间排序的排序器
package com.paul.integration.file.filters.comparator;

import java.io.file;
import java.util.comparator;

public class filecreationtimecomparator implements comparator{

    @override
    public int compare(file file1, file file2) {
        return long.valueof(file2.lastmodified()).compareto(
                long.valueof(file1.lastmodified()));
//        return file1.getname().comparetoignorecase(file2.getname());
    }
    
}


封装了job和jobparameters的holder类

package example;

import java.io.file;

import org.springframework.batch.core.job;
import org.springframework.batch.core.jobparameters;
import org.springframework.batch.core.jobparametersbuilder;
import org.springframework.batch.core.launch.nosuchjobexception;
import org.springframework.batch.integration.launch.joblaunchrequest;
import org.springframework.beans.factory.initializingbean;
import org.springframework.integration.annotation.messageendpoint;
import org.springframework.integration.annotation.serviceactivator;
import org.springframework.util.assert;

/**
 * adapt a {
@link file} to a {@link joblaunchrequest} with a job parameter
 * input.file equal to the path of the file.
 * 
 * 
@author dave syer
 * 
 
*/
@messageendpoint
public class filetojoblaunchrequestadapter implements initializingbean {

    private job job;

    public void setjob(job job) {
        this.job = job;
    }

    public void afterpropertiesset() throws exception {
        assert.notnull(job, "a job must be provided");
    }

    @serviceactivator
    public joblaunchrequest adapt(file file) throws nosuchjobexception {

        string filename = file.getabsolutepath();

        if (!filename.startswith("/")) {
            filename = "/"   filename;
        }

        filename = "file://"   filename;
        
        string outputfilepath = "file:/users/paul/documents/paul/download/software/develop/spring batch/" 
                "spring-batch-2.1.9.release/samples/spring-batch-simple-cli/file/output/out.xml";

        jobparameters jobparameters = new jobparametersbuilder().
                addstring("input.file.path", filename).
                addstring("output.file.path", outputfilepath).
                addlong("time.stamp", system.currenttimemillis()).
                tojobparameters();

        if (job.getjobparametersincrementer() != null) {
            jobparameters = job.getjobparametersincrementer().getnext(jobparameters);
        }

        return new joblaunchrequest(job, jobparameters);

    }

}


trade测试数据文件
xml version="1.0" encoding="utf-8"?>
<records>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>xyz0001isin>
        <quantity>5quantity>
        <price>11.39price>
        <customer>customer1customer>
    trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>xyz0002isin>
        <quantity>2quantity>
        <price>72.99price>
        <customer>customer2ccustomer>
    trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>xyz0003isin>
        <quantity>9quantity>
        <price>99.99price>
        <customer>customer3customer>
    trade>
records>

maven中的jar dependency:
<dependency>
    <groupid>org.springframework.integrationgroupid>
    <artifactid>spring-integration-coreartifactid>
    <version>2.0.3.releaseversion>
dependency>
<dependency>
    <groupid>org.springframework.batchgroupid>
    <artifactid>spring-batch-integrationartifactid>
    <version>1.2.1.releaseversion>
dependency>
<dependency>
    <groupid>org.springframework.integrationgroupid>
    <artifactid>spring-integration-fileartifactid>
    <version>2.0.3.releaseversion>
dependency>

至此全部通道已打通。


参考:











posted on 2012-10-16 00:11 paulwong 阅读(5352) 评论(7)     所属分类: spring intergrationsrping batch

feedback

# re: spring integration spring batch 2012-11-16 16:17

你好:现在有个问题请求帮助。
需求:数据源来自多个目录下的文件,而且文件的格式不一样。该怎么解决?跪求结果,谢谢了

现在读取多个目录下的文件的问题已经解决了,我是通过配置多个输入channel,一个输出 channel。就是对多个目录进行监控。我是没有办法,才这样。因为我需要监控多个目录。

还个难题就是多个文件进来了,格式不一样,怎么样去调用不同的分割器,分割文件。或者说调用不到的读取器

以下是我的applicationcontext-integration.xml配置






directory="${config.1000000022.sourcepath}" auto-create-directory="true"
auto-startup="true" channel="videoplayerfilechannel" filename-pattern="${config.1000000022.sourcetype}"
comparator="filecomparator">



output-channel="videoplayerjoblaunchrequestchannel" ref="launcher"
method="adapt" />

output-channel="videoplayerjobexecutionchannel">
class="org.springframework.batch.integration.launch.joblaunchingmessagehandler">









class="com.zjcy.cbs.pretreatment.batch.filecomparator">









directory="${config.2000000001.sourcepath}" auto-create-directory="true" auto-startup="true"
channel="filechannelmessage" filename-pattern="*${config.2000000001.sourcetype}" comparator="filecomparator">



output-channel="videoplayerjoblaunchrequestchannel" ref="launcher"
method="adapt" />  回复     

# re: spring integration spring batch 2012-11-17 22:39 paulwong

@rojeff
可以在videoplayerfilechannel中取得文件后缀名,加一个选择判断器之类的东西,按后缀名分派到不同的channel中。  回复     

# re: spring integration spring batch[未登录] 2012-11-18 09:44

@paulwong
非常谢谢paulwong的帮助。
  回复     

# re: spring integration spring batch 2012-11-18 09:44

@paulwong
非常谢谢paulwong的帮助。  回复     

# re: spring integration spring batch 2012-11-19 17:25

@paulwong
请教问题:
需求:file:inbound-channel-adapter 能配置多个目录吗,或者说可以包括子目录吗?如果可以,该怎么配置?

channel="filechannel" filename-pattern="t*.xml" comparator="filecreationtimecomparator">

  回复     

# re: spring integration spring batch 2012-11-19 22:17 paulwong

@rojeff


id="inputchannel" scanner="scanner" directory="/some/folder">  回复     

# re: spring integration spring batch 2012-11-19 22:47

@paulwong
大牛就是大牛啊,这个问题得到解决,非常感觉。  回复     



只有注册用户后才能发表评论。


网站导航:
              
相关文章:
 
网站地图