Spring Batch - Getting a DeadlockLoserDataAccessException when trying to read/write to the same table
I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
java db2 spring-batch compositeitemwriter
add a comment |
I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
java db2 spring-batch compositeitemwriter
add a comment |
I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
java db2 spring-batch compositeitemwriter
I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
java db2 spring-batch compositeitemwriter
java db2 spring-batch compositeitemwriter
asked Nov 20 at 20:42
Karson074
559
559
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
Here is a quick (self-contained) example with the same config as you mentioned:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
'}';
}
}
}
It reads persons from a Person
table (TableA in your case), uppercase their name and writes the result in a People
table (TableB in your case). Then it updates the processed
flag on the Person
table.
If you run the sample, you should see:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
without any dead lock exception.
Hope this helps.
add a comment |
I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
See the example below.
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
add a comment |
The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...
If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401188%2fspring-batch-getting-a-deadlockloserdataaccessexception-when-trying-to-read-wr%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
Here is a quick (self-contained) example with the same config as you mentioned:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
'}';
}
}
}
It reads persons from a Person
table (TableA in your case), uppercase their name and writes the result in a People
table (TableB in your case). Then it updates the processed
flag on the Person
table.
If you run the sample, you should see:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
without any dead lock exception.
Hope this helps.
add a comment |
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
Here is a quick (self-contained) example with the same config as you mentioned:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
'}';
}
}
}
It reads persons from a Person
table (TableA in your case), uppercase their name and writes the result in a People
table (TableB in your case). Then it updates the processed
flag on the Person
table.
If you run the sample, you should see:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
without any dead lock exception.
Hope this helps.
add a comment |
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
Here is a quick (self-contained) example with the same config as you mentioned:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
'}';
}
}
}
It reads persons from a Person
table (TableA in your case), uppercase their name and writes the result in a People
table (TableB in your case). Then it updates the processed
flag on the Person
table.
If you run the sample, you should see:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
without any dead lock exception.
Hope this helps.
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
Here is a quick (self-contained) example with the same config as you mentioned:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + ''' +
'}';
}
}
}
It reads persons from a Person
table (TableA in your case), uppercase their name and writes the result in a People
table (TableB in your case). Then it updates the processed
flag on the Person
table.
If you run the sample, you should see:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
without any dead lock exception.
Hope this helps.
answered Nov 21 at 10:09
Mahmoud Ben Hassine
3,6451714
3,6451714
add a comment |
add a comment |
I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
See the example below.
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
add a comment |
I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
See the example below.
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
add a comment |
I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
See the example below.
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
See the example below.
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
edited Nov 21 at 14:33
answered Nov 21 at 7:58
Mark Barinstein
99414
99414
add a comment |
add a comment |
The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...
If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.
add a comment |
The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...
If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.
add a comment |
The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...
If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.
The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...
If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.
answered Nov 20 at 20:55
MichaelTiefenbacher
2,1392614
2,1392614
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401188%2fspring-batch-getting-a-deadlockloserdataaccessexception-when-trying-to-read-wr%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown