spring integration的强项是事件驱动,但捕获之后,要做的事是触发一个类的方法,对于要处理大数据量的文件,就没有办法了,如读取1w条记录,然后插入数据库。而这个正是spring batch的强项所在,因此有必要将此两个框架整合起来用。
场景:盯着一个文件夹,如果一有文件,此文件可能非常大的,则启动一个batch job来处理。
文件拉取器,监控文件夹一有新文件,则将此文件包装成message,丢到下一个通道中:
<file:inbound-channel-adapter id="filepoller"
channel="filesarecoming"
directory="file:${input.directory}"
filename-pattern="test*" />
filesarecoming通道的serviceactivator
public joblaunchrequest adapt(file file) throws nosuchjobexception {
jobparameters jobparameters = new jobparametersbuilder().addstring(
"input.file", file.getabsolutepath()).tojobparameters();
return new joblaunchrequest(job, jobparameters);
}
joblauncher通道的serviceactivator
<service-activator input-channel="joblauncher">
<beans:bean class="org.springframework.batch.integration.launch.joblaunchingmessagehandler">
<beans:constructor-arg ref="joblauncher" />
beans:bean>
service-activator>
"file.input"依赖于执行期的取值
<bean id="reader" class="org.springframework.batch.item.file.flatfileitemreader" scope="step">
<property name="resource" value="#{jobparameters[input.file]}" />
line mapper and other props
bean>
参考的spring xml
xml version="1.0" encoding="utf-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xmlns:beans="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:file="http://www.springframework.org/schema/integration/file"
xsi:schemalocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">
<channel id="filechannel"/>
<channel id="joblaunchrequestchannel"/>
<channel id="jobexecutionchannel"/>
<logging-channel-adapter channel="jobexecutionchannel" />
<file:inbound-channel-adapter directory="/users/paul/documents/paul/download/software/develop/spring batch/spring-batch-2.1.9.release/samples/spring-batch-simple-cli/file"
channel="filechannel" filename-pattern="t*.xml" comparator="filecreationtimecomparator">
<poller max-messages-per-poll="1" cron="0/1 * * * * *" />
file:inbound-channel-adapter>
<service-activator input-channel="filechannel"
output-channel="joblaunchrequestchannel"
ref="filetojoblaunchrequestadapter"
method="adapt"/>
<service-activator input-channel="joblaunchrequestchannel" output-channel="jobexecutionchannel">
<beans:bean class="org.springframework.batch.integration.launch.joblaunchingmessagehandler">
<beans:constructor-arg ref="joblauncher" />
beans:bean>
service-activator>
<beans:bean id="filetojoblaunchrequestadapter" class="example.filetojoblaunchrequestadapter">
<beans:property name="job" ref="helloworldjob"/>
beans:bean>
<beans:bean id="filecreationtimecomparator" class="com.paul.integration.file.filters.comparator.filecreationtimecomparator">
beans:bean>
beans:beans>
spring batch job的配置文件
xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<batch:job id="helloworldjob">
<batch:step id="step1" next="xmlfilereadandwriterstep">
<batch:tasklet ref="helloworldtasklet">batch:tasklet>
batch:step>
<batch:step id="xmlfilereadandwriterstep">
<batch:tasklet>
<batch:chunk reader="xmlreader" writer="xmlwriter" processor="xmlprocessor"
commit-interval="10">
batch:chunk>
batch:tasklet>
batch:step>
batch:job>
<bean id="helloworldtasklet" class="example.helloworldtasklet">bean>
<bean id="xmlreader"
class="org.springframework.batch.item.xml.staxeventitemreader" scope="step">
<property name="fragmentrootelementname" value="trade" />
<property name="unmarshaller" ref="trademarshaller" />
<property name="resource" value="#{jobparameters['input.file.path']}" />
bean>
<bean id="xmlprocessor" class="com.paul.batch.xmlprocessor" />
<bean id="xmlwriter" class="org.springframework.batch.item.xml.staxeventitemwriter"
scope="step">
<property name="roottagname" value="wanggc" />
<property name="marshaller" ref="trademarshaller" />
<property name="resource" value="#{jobparameters['output.file.path']}" />
bean>
<bean id="trademarshaller" class="org.springframework.oxm.xstream.xstreammarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade" value="com.paul.domain.trade" />
<entry key="price" value="java.math.bigdecimal" />
<entry key="name" value="java.lang.string" />
util:map>
property>
bean>
beans>
文件处理器
package com.paul.batch;
import org.springframework.batch.item.itemprocessor;
import com.paul.domain.trade;
public class xmlprocessor implements itemprocessor {
/**
* xml文件内容处理。
*
*/
@override
public trade process(trade trade) throws exception {
return trade;
}
}
domain trade对象
/*
* 凯发天生赢家一触即发官网 copyright 2006-2007 the original author or authors.
*
* licensed under the apache license, version 2.0 (the "license");
* you may not use this file except in compliance with the license.
* you may obtain a copy of the license at
*
* http://www.apache.org/licenses/license-2.0
*
* unless required by applicable law or agreed to in writing, software
* distributed under the license is distributed on an "as is" basis,
* without warranties or conditions of any kind, either express or implied.
* see the license for the specific language governing permissions and
* limitations under the license.
*/
package com.paul.domain;
import java.io.serializable;
import java.math.bigdecimal;
/**
* @author rob harrop
* @author dave syer
*/
public class trade implements serializable {
private string isin = "";
private long quantity = 0;
private bigdecimal price = new bigdecimal(0);
private string customer = "";
private long id;
private long version = 0;
public trade() {
}
public trade(string isin, long quantity, bigdecimal price, string customer){
this.isin = isin;
this.quantity = quantity;
this.price = price;
this.customer = customer;
}
/**
* @param id
*/
public trade(long id) {
this.id = id;
}
public long getid() {
return id;
}
public void setid(long id) {
this.id = id;
}
public long getversion() {
return version;
}
public void setversion(long version) {
this.version = version;
}
public void setcustomer(string customer) {
this.customer = customer;
}
public void setisin(string isin) {
this.isin = isin;
}
public void setprice(bigdecimal price) {
this.price = price;
}
public void setquantity(long quantity) {
this.quantity = quantity;
}
public string getisin() {
return isin;
}
public bigdecimal getprice() {
return price;
}
public long getquantity() {
return quantity;
}
public string getcustomer() {
return customer;
}
public string tostring() {
return "trade: [isin=" this.isin ",quantity=" this.quantity ",price="
this.price ",customer=" this.customer "]";
}
@override
public int hashcode() {
final int prime = 31;
int result = 1;
result = prime * result ((customer == null) ? 0 : customer.hashcode());
result = prime * result ((isin == null) ? 0 : isin.hashcode());
result = prime * result ((price == null) ? 0 : price.hashcode());
result = prime * result (int) (quantity ^ (quantity >>> 32));
result = prime * result (int) (version ^ (version >>> 32));
return result;
}
@override
public boolean equals(object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getclass() != obj.getclass())
return false;
trade other = (trade) obj;
if (customer == null) {
if (other.customer != null)
return false;
}
else if (!customer.equals(other.customer))
return false;
if (isin == null) {
if (other.isin != null)
return false;
}
else if (!isin.equals(other.isin))
return false;
if (price == null) {
if (other.price != null)
return false;
}
else if (!price.equals(other.price))
return false;
if (quantity != other.quantity)
return false;
if (version != other.version)
return false;
return true;
}
}
从文件夹取出文件列表后,进行按修改时间排序的排序器
package com.paul.integration.file.filters.comparator;
import java.io.file;
import java.util.comparator;
public class filecreationtimecomparator implements comparator{
@override
public int compare(file file1, file file2) {
return long.valueof(file2.lastmodified()).compareto(
long.valueof(file1.lastmodified()));
// return file1.getname().comparetoignorecase(file2.getname());
}
}
封装了job和jobparameters的holder类
package example;
import java.io.file;
import org.springframework.batch.core.job;
import org.springframework.batch.core.jobparameters;
import org.springframework.batch.core.jobparametersbuilder;
import org.springframework.batch.core.launch.nosuchjobexception;
import org.springframework.batch.integration.launch.joblaunchrequest;
import org.springframework.beans.factory.initializingbean;
import org.springframework.integration.annotation.messageendpoint;
import org.springframework.integration.annotation.serviceactivator;
import org.springframework.util.assert;
/**
* adapt a {@link file} to a {@link joblaunchrequest} with a job parameter
* input.file
equal to the path of the file.
*
* @author dave syer
*
*/
@messageendpoint
public class filetojoblaunchrequestadapter implements initializingbean {
private job job;
public void setjob(job job) {
this.job = job;
}
public void afterpropertiesset() throws exception {
assert.notnull(job, "a job must be provided");
}
@serviceactivator
public joblaunchrequest adapt(file file) throws nosuchjobexception {
string filename = file.getabsolutepath();
if (!filename.startswith("/")) {
filename = "/" filename;
}
filename = "file://" filename;
string outputfilepath = "file:/users/paul/documents/paul/download/software/develop/spring batch/"
"spring-batch-2.1.9.release/samples/spring-batch-simple-cli/file/output/out.xml";
jobparameters jobparameters = new jobparametersbuilder().
addstring("input.file.path", filename).
addstring("output.file.path", outputfilepath).
addlong("time.stamp", system.currenttimemillis()).
tojobparameters();
if (job.getjobparametersincrementer() != null) {
jobparameters = job.getjobparametersincrementer().getnext(jobparameters);
}
return new joblaunchrequest(job, jobparameters);
}
}
trade测试数据文件
xml version="1.0" encoding="utf-8"?>
<records>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>xyz0001isin>
<quantity>5quantity>
<price>11.39price>
<customer>customer1customer>
trade>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>xyz0002isin>
<quantity>2quantity>
<price>72.99price>
<customer>customer2ccustomer>
trade>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>xyz0003isin>
<quantity>9quantity>
<price>99.99price>
<customer>customer3customer>
trade>
records>
maven中的jar dependency:
<dependency>
<groupid>org.springframework.integrationgroupid>
<artifactid>spring-integration-coreartifactid>
<version>2.0.3.releaseversion>
dependency>
<dependency>
<groupid>org.springframework.batchgroupid>
<artifactid>spring-batch-integrationartifactid>
<version>1.2.1.releaseversion>
dependency>
<dependency>
<groupid>org.springframework.integrationgroupid>
<artifactid>spring-integration-fileartifactid>
<version>2.0.3.releaseversion>
dependency>
至此全部通道已打通。
参考: