Last active
November 10, 2015 12:59
-
-
Save mminella/fe4e2b4ec15fc995ab2a to your computer and use it in GitHub Desktop.
Incremental Imports via Spring Batch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.spring.batch; | |
import static org.junit.Assert.assertEquals; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.util.ArrayList; | |
import java.util.Calendar; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import javax.annotation.PostConstruct; | |
import javax.sql.DataSource; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.batch.core.BatchStatus; | |
import org.springframework.batch.core.Job; | |
import org.springframework.batch.core.JobExecution; | |
import org.springframework.batch.core.JobInstance; | |
import org.springframework.batch.core.JobParametersBuilder; | |
import org.springframework.batch.core.Step; | |
import org.springframework.batch.core.StepExecution; | |
import org.springframework.batch.core.annotation.BeforeStep; | |
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; | |
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; | |
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; | |
import org.springframework.batch.core.configuration.annotation.StepScope; | |
import org.springframework.batch.core.explore.JobExplorer; | |
import org.springframework.batch.core.launch.JobLauncher; | |
import org.springframework.batch.core.listener.ExecutionContextPromotionListener; | |
import org.springframework.batch.item.ItemWriter; | |
import org.springframework.batch.item.database.JdbcPagingItemReader; | |
import org.springframework.batch.item.database.Order; | |
import org.springframework.batch.item.database.support.HsqlPagingQueryProvider; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.core.io.ResourceLoader; | |
import org.springframework.jdbc.core.JdbcOperations; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.jdbc.core.RowMapper; | |
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseFactory; | |
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils; | |
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; | |
import org.springframework.test.context.ContextConfiguration; | |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; | |
import org.springframework.util.ClassUtils; | |
/** | |
* @author Michael Minella | |
*/ | |
@RunWith(SpringJUnit4ClassRunner.class) | |
@ContextConfiguration(classes = {LastRunPersistenceTest.JobConfiguration.class}) | |
public class LastRunPersistenceTest { | |
@Autowired | |
private DataSource dataSource; | |
private Date lastDate = Calendar.getInstance().getTime(); | |
@Autowired | |
private Job job1; | |
@Autowired | |
private JobLauncher jobLauncher; | |
private int iteration = 0; | |
private boolean initalized = false; | |
@Before | |
public void setUp() { | |
if(!initalized) { | |
JdbcOperations jdbcTemplate = new JdbcTemplate(dataSource); | |
jdbcTemplate.execute("CREATE TABLE SOURCE_TABLE (FIELD1 CHAR(20) NOT NULL, " + | |
"FIELD2 VARCHAR(20) NOT NULL, " + | |
"INSERT_DATE TIMESTAMP NOT NULL)"); | |
initalized = true; | |
} | |
} | |
/** | |
* This test method will add 10 records to the source_table table then launch a job to process them. The previous | |
* records are not deleted (appending only). The metadata for where to restart is stored in the Spring Batch Job | |
* Repository via the {@link com.michaelminella.batch.LastRunPersistenceTest.IncrementingListener}. | |
* | |
* @throws Exception Theoretically things can go wrong. | |
*/ | |
@Test | |
public void test() throws Exception { | |
for(int i = 0; i < 3; i++) { | |
// Adds 10 records | |
addData(); | |
// Processes the most recent 10 records | |
assertEquals(BatchStatus.COMPLETED, jobLauncher.run(job1, | |
new JobParametersBuilder().addDate("randomDate", new Date()).toJobParameters()).getStatus()); | |
} | |
// Run the job without adding records to demonstrate that doesn't impact the incremental runs | |
assertEquals(BatchStatus.COMPLETED, jobLauncher.run(job1, | |
new JobParametersBuilder().addDate("randomDate", new Date()).toJobParameters()).getStatus()); | |
for(int i = 0; i < 3; i++) { | |
// Adds 10 records | |
addData(); | |
// Processes the most recent 10 records | |
assertEquals(BatchStatus.COMPLETED, jobLauncher.run(job1, | |
new JobParametersBuilder().addDate("randomDate", new Date()).toJobParameters()).getStatus()); | |
} | |
} | |
/** | |
* Append 10 records to the current db table. | |
*/ | |
private void addData() { | |
JdbcOperations jdbcTemplate = new JdbcTemplate(dataSource); | |
List<Object[]> params = new ArrayList<Object[]>(); | |
int max = iteration + 10; | |
for(; iteration < max; iteration++) { | |
Object [] curParams = new Object[3]; | |
curParams[0] = "field1 - " + iteration; | |
curParams[1] = "field2 - " + iteration; | |
lastDate = new Date(lastDate.getTime() + TimeUnit.HOURS.toMillis(1)); | |
curParams[2] = lastDate; | |
params.add(curParams); | |
} | |
jdbcTemplate.batchUpdate("INSERT INTO SOURCE_TABLE VALUES(?, ?, ?)", params); | |
} | |
@Configuration | |
@EnableBatchProcessing | |
public static class JobConfiguration { | |
@Autowired | |
private JobBuilderFactory jobBuilderFactory; | |
@Autowired | |
private StepBuilderFactory stepBuilderFactory; | |
@Autowired | |
private ResourceLoader resourceLoader; | |
@Bean | |
@StepScope | |
public JdbcPagingItemReader<Item> itemReader(@Value("#{stepExecutionContext[startTimestamp]}") Date startTimestamp, | |
@Value("#{stepExecutionContext[endTimestamp]}") Date endTimestamp) throws Exception { | |
JdbcPagingItemReader<Item> reader = new JdbcPagingItemReader<Item>(); | |
reader.setDataSource(dataSource()); | |
reader.setFetchSize(3); | |
reader.setRowMapper(new RowMapper<Item>() { | |
@Override | |
public Item mapRow(ResultSet rs, int rowNum) throws SQLException { | |
Item item = new Item(); | |
item.setField1(rs.getString(1)); | |
item.setField2(rs.getString(2)); | |
item.setInsertTime(rs.getDate(3)); | |
return item; | |
} | |
}); | |
HsqlPagingQueryProvider queryProvider = new HsqlPagingQueryProvider(); | |
queryProvider.setSelectClause("select field1, field2, insert_date"); | |
queryProvider.setFromClause("from source_table"); | |
queryProvider.setWhereClause("insert_date > :startTimestamp and insert_date <= :endTimestamp"); | |
Map<String, Order> sortKeys = new HashMap<String, Order>(); | |
sortKeys.put("insert_date", Order.ASCENDING); | |
queryProvider.setSortKeys(sortKeys); | |
reader.setQueryProvider(queryProvider); | |
Map<String, Object> parameters = new HashMap<String, Object>(); | |
parameters.put("startTimestamp", startTimestamp); | |
parameters.put("endTimestamp", endTimestamp); | |
reader.setParameterValues(parameters); | |
reader.afterPropertiesSet(); | |
return reader; | |
} | |
@Bean | |
public ItemWriter<Item> itemWriter() { | |
return new ItemWriter<Item>() { | |
@Override | |
public void write(List<? extends Item> items) throws Exception { | |
for (Item item : items) { | |
System.err.println(item); | |
} | |
} | |
}; | |
} | |
@Bean | |
public Step step1() throws Exception { | |
return stepBuilderFactory.get("step1") | |
.listener(incrementalListener()) | |
.listener(promotionListener()) | |
.<Item, Item>chunk(3) | |
.reader(itemReader(null, null)) | |
.writer(itemWriter()).build(); | |
} | |
@Bean | |
public Job job1() throws Exception { | |
return jobBuilderFactory.get("job1") | |
.start(step1()) | |
.build(); | |
} | |
@Bean | |
public ExecutionContextPromotionListener promotionListener() { | |
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener(); | |
listener.setKeys(new String [] {"startTimestamp", "endTimestamp"}); | |
return listener; | |
} | |
@Bean | |
public IncrementingListener incrementalListener() { | |
return new IncrementingListener(dataSource()); | |
} | |
@Bean | |
public DataSource dataSource() { | |
return new EmbeddedDatabaseFactory().getDatabase(); | |
} | |
@PostConstruct | |
protected void initialize() { | |
ResourceDatabasePopulator populator = new ResourceDatabasePopulator(); | |
populator.addScript( | |
resourceLoader.getResource(ClassUtils.addResourcePathToPackagePath(Step.class, "schema-hsqldb.sql"))); | |
populator.setContinueOnError(true); | |
DatabasePopulatorUtils.execute(populator, dataSource()); | |
} | |
} | |
public static class IncrementingListener { | |
private JdbcOperations template; | |
@Autowired | |
private JobExplorer jobExplorer; | |
public IncrementingListener(DataSource dataSource) { | |
template = new JdbcTemplate(dataSource); | |
} | |
@BeforeStep | |
public void beforeStep(StepExecution stepExecution) { | |
Date lastTimestamp = template.queryForObject("SELECT MAX(INSERT_DATE) FROM SOURCE_TABLE", Date.class); | |
// Get the last jobInstance...not the current one | |
List<JobInstance> jobInstances = jobExplorer.getJobInstances("job1", 0, 2); | |
Date startTimestamp = new Date(0); | |
if(jobInstances.size() > 1) { | |
JobInstance lastInstance = jobInstances.get(1); | |
if(lastInstance != null) { | |
List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance); | |
JobExecution lastExecution = executions.get(0); | |
for (JobExecution execution : executions) { | |
if(lastExecution.getEndTime().getTime() < execution.getEndTime().getTime()) { | |
lastExecution = execution; | |
} | |
} | |
if(lastExecution.getExecutionContext().containsKey("endTimestamp")) { | |
startTimestamp = (Date) lastExecution.getExecutionContext().get("endTimestamp"); | |
} | |
} | |
} | |
stepExecution.getExecutionContext().put("startTimestamp", startTimestamp); | |
stepExecution.getExecutionContext().put("endTimestamp", lastTimestamp); | |
} | |
} | |
public static class Item { | |
private String field1; | |
private String field2; | |
private Date insertTime; | |
public String getField1() { | |
return field1; | |
} | |
public void setField1(String field1) { | |
this.field1 = field1; | |
} | |
public String getField2() { | |
return field2; | |
} | |
public void setField2(String field2) { | |
this.field2 = field2; | |
} | |
public Date getInsertTime() { | |
return insertTime; | |
} | |
public void setInsertTime(Date insertTime) { | |
this.insertTime = insertTime; | |
} | |
@Override | |
public String toString() { | |
return String.format("%s | %s | %s", field1, field2, insertTime.toString()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment