Spring Batch - Getting a DeadlockLoserDataAccessException when trying to read/write to the same table












0














I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?



I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.










share|improve this question



























    0














    I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?



    I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.










    share|improve this question

























      0












      0








      0







      I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?



      I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.










      share|improve this question













      I am working on a Spring Batch application that will Read unprocessed data from Table A, process the Data, Insert the processed Data to Table B, and then Update the row in Table A to PROCESSED. However, while Inserting the data into Table B works fine, I keep getting a DeadlockLoserDataAccessException every time I try to Updated Table A. I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?



      I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.







      java db2 spring-batch compositeitemwriter






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 20 at 20:42









      Karson074

      559




      559
























          3 Answers
          3






          active

          oldest

          votes


















          0















          I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?




          This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).




          I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.




          Here is a quick (self-contained) example with the same config as you mentioned:



          import java.util.Arrays;
          import javax.sql.DataSource;

          import org.springframework.batch.core.Job;
          import org.springframework.batch.core.JobParameters;
          import org.springframework.batch.core.Step;
          import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
          import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
          import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
          import org.springframework.batch.core.launch.JobLauncher;
          import org.springframework.batch.item.ItemProcessor;
          import org.springframework.batch.item.database.JdbcBatchItemWriter;
          import org.springframework.batch.item.database.JdbcCursorItemReader;
          import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
          import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
          import org.springframework.batch.item.support.CompositeItemWriter;
          import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.context.ApplicationContext;
          import org.springframework.context.annotation.AnnotationConfigApplicationContext;
          import org.springframework.context.annotation.Bean;
          import org.springframework.context.annotation.Configuration;
          import org.springframework.jdbc.core.JdbcTemplate;
          import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
          import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

          @Configuration
          @EnableBatchProcessing
          public class MyJob {

          @Autowired
          private JobBuilderFactory jobs;

          @Autowired
          private StepBuilderFactory steps;

          @Bean
          public JdbcCursorItemReader<Person> itemReader() {
          return new JdbcCursorItemReaderBuilder<Person>()
          .name("personItemReader")
          .dataSource(dataSource())
          .sql("select id, name from person where processed = false")
          .beanRowMapper(Person.class)
          .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
          .build();
          }

          @Bean
          public ItemProcessor<Person, Person> itemProcessor() {
          return item -> new Person(item.getId(), item.getName().toUpperCase());
          }

          @Bean
          public CompositeItemWriter<Person> itemWriter() {
          return new CompositeItemWriterBuilder<Person>()
          .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
          .ignoreItemStream(true)
          .build();
          }

          @Bean
          public JdbcBatchItemWriter<Person> peopleItemWriter() {
          return new JdbcBatchItemWriterBuilder<Person>()
          .dataSource(dataSource())
          .beanMapped()
          .sql("insert into people (name) values (:name)")
          .build();
          }

          @Bean
          public JdbcBatchItemWriter<Person> personItemUpdater() {
          return new JdbcBatchItemWriterBuilder<Person>()
          .dataSource(dataSource())
          .beanMapped()
          .sql("update person set processed = true where id = :id")
          .build();
          }

          @Bean
          public Step step() {
          return steps.get("step")
          .<Person, Person>chunk(1)
          .reader(itemReader())
          .processor(itemProcessor())
          .writer(itemWriter())
          .build();
          }

          @Bean
          public Job job() {
          return jobs.get("job")
          .start(step())
          .build();
          }

          @Bean
          public DataSource dataSource() {
          return new EmbeddedDatabaseBuilder()
          .setType(EmbeddedDatabaseType.H2)
          .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
          .addScript("/org/springframework/batch/core/schema-h2.sql")
          .build();
          }

          @Bean
          public JdbcTemplate jdbcTemplate(DataSource dataSource) {
          return new JdbcTemplate(dataSource);
          }

          public static void main(String args) throws Exception {

          ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
          JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
          jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
          jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
          jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
          jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

          JobLauncher jobLauncher = context.getBean(JobLauncher.class);
          Job job = context.getBean(Job.class);
          jobLauncher.run(job, new JobParameters());

          Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
          Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
          System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
          System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
          Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
          System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
          }

          public static class Person {

          private int id;

          private String name;

          public Person() {
          }

          public Person(int id, String name) {
          this.id = id;
          this.name = name;
          }

          public int getId() {
          return id;
          }

          public void setId(int id) {
          this.id = id;
          }

          public String getName() {
          return name;
          }

          public void setName(String name) {
          this.name = name;
          }

          @Override
          public String toString() {
          return "Person{" +
          "id=" + id +
          ", name='" + name + ''' +
          '}';
          }
          }

          }


          It reads persons from a Person table (TableA in your case), uppercase their name and writes the result in a People table (TableB in your case). Then it updates the processed flag on the Person table.



          If you run the sample, you should see:



          nbInsertedFoos in people table = 1
          nbInsertedBars in people table = 1
          nbUpdatedPersons in person table = 2


          without any dead lock exception.



          Hope this helps.






          share|improve this answer





























            0














            I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
            See the example below.



            -- *** Example of queue processing in DB2 ***
            -- The following registry variables must be set:
            -- DB2_SKIPINSERTED=YES
            -- DB2_SKIPDELETED=YES
            -- DB2_EVALUNCOMMITTED=YES
            -- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

            create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
            -- 'exclude null keys' is avaiable starting from V10.5
            create index test1 on test(not_processed) exclude null keys;
            alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
            insert into test (id) values 1,2;

            -- Every session starts its transaction with locking its own set of rows (only one in the example),
            -- which becomes invisible for the same statement issued by other concurrent transactions
            -- due to setting registry variables above.
            -- No lock waits expected on such an update.
            update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
            -- work with other tables comes below
            -- ...
            -- transaction end





            share|improve this answer































              -1














              The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...



              If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.






              share|improve this answer





















                Your Answer






                StackExchange.ifUsing("editor", function () {
                StackExchange.using("externalEditor", function () {
                StackExchange.using("snippets", function () {
                StackExchange.snippets.init();
                });
                });
                }, "code-snippets");

                StackExchange.ready(function() {
                var channelOptions = {
                tags: "".split(" "),
                id: "1"
                };
                initTagRenderer("".split(" "), "".split(" "), channelOptions);

                StackExchange.using("externalEditor", function() {
                // Have to fire editor after snippets, if snippets enabled
                if (StackExchange.settings.snippets.snippetsEnabled) {
                StackExchange.using("snippets", function() {
                createEditor();
                });
                }
                else {
                createEditor();
                }
                });

                function createEditor() {
                StackExchange.prepareEditor({
                heartbeatType: 'answer',
                autoActivateHeartbeat: false,
                convertImagesToLinks: true,
                noModals: true,
                showLowRepImageUploadWarning: true,
                reputationToPostImages: 10,
                bindNavPrevention: true,
                postfix: "",
                imageUploader: {
                brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
                contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
                allowUrls: true
                },
                onDemand: true,
                discardSelector: ".discard-answer"
                ,immediatelyShowMarkdownHelp:true
                });


                }
                });














                draft saved

                draft discarded


















                StackExchange.ready(
                function () {
                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401188%2fspring-batch-getting-a-deadlockloserdataaccessexception-when-trying-to-read-wr%23new-answer', 'question_page');
                }
                );

                Post as a guest















                Required, but never shown

























                3 Answers
                3






                active

                oldest

                votes








                3 Answers
                3






                active

                oldest

                votes









                active

                oldest

                votes






                active

                oldest

                votes









                0















                I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?




                This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).




                I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.




                Here is a quick (self-contained) example with the same config as you mentioned:



                import java.util.Arrays;
                import javax.sql.DataSource;

                import org.springframework.batch.core.Job;
                import org.springframework.batch.core.JobParameters;
                import org.springframework.batch.core.Step;
                import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
                import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
                import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
                import org.springframework.batch.core.launch.JobLauncher;
                import org.springframework.batch.item.ItemProcessor;
                import org.springframework.batch.item.database.JdbcBatchItemWriter;
                import org.springframework.batch.item.database.JdbcCursorItemReader;
                import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
                import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
                import org.springframework.batch.item.support.CompositeItemWriter;
                import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.context.ApplicationContext;
                import org.springframework.context.annotation.AnnotationConfigApplicationContext;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
                import org.springframework.jdbc.core.JdbcTemplate;
                import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
                import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

                @Configuration
                @EnableBatchProcessing
                public class MyJob {

                @Autowired
                private JobBuilderFactory jobs;

                @Autowired
                private StepBuilderFactory steps;

                @Bean
                public JdbcCursorItemReader<Person> itemReader() {
                return new JdbcCursorItemReaderBuilder<Person>()
                .name("personItemReader")
                .dataSource(dataSource())
                .sql("select id, name from person where processed = false")
                .beanRowMapper(Person.class)
                .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                .build();
                }

                @Bean
                public ItemProcessor<Person, Person> itemProcessor() {
                return item -> new Person(item.getId(), item.getName().toUpperCase());
                }

                @Bean
                public CompositeItemWriter<Person> itemWriter() {
                return new CompositeItemWriterBuilder<Person>()
                .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                .ignoreItemStream(true)
                .build();
                }

                @Bean
                public JdbcBatchItemWriter<Person> peopleItemWriter() {
                return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("insert into people (name) values (:name)")
                .build();
                }

                @Bean
                public JdbcBatchItemWriter<Person> personItemUpdater() {
                return new JdbcBatchItemWriterBuilder<Person>()
                .dataSource(dataSource())
                .beanMapped()
                .sql("update person set processed = true where id = :id")
                .build();
                }

                @Bean
                public Step step() {
                return steps.get("step")
                .<Person, Person>chunk(1)
                .reader(itemReader())
                .processor(itemProcessor())
                .writer(itemWriter())
                .build();
                }

                @Bean
                public Job job() {
                return jobs.get("job")
                .start(step())
                .build();
                }

                @Bean
                public DataSource dataSource() {
                return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.H2)
                .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                .addScript("/org/springframework/batch/core/schema-h2.sql")
                .build();
                }

                @Bean
                public JdbcTemplate jdbcTemplate(DataSource dataSource) {
                return new JdbcTemplate(dataSource);
                }

                public static void main(String args) throws Exception {

                ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
                JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
                jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
                jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
                jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
                jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

                JobLauncher jobLauncher = context.getBean(JobLauncher.class);
                Job job = context.getBean(Job.class);
                jobLauncher.run(job, new JobParameters());

                Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
                Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
                System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
                System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
                Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
                System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
                }

                public static class Person {

                private int id;

                private String name;

                public Person() {
                }

                public Person(int id, String name) {
                this.id = id;
                this.name = name;
                }

                public int getId() {
                return id;
                }

                public void setId(int id) {
                this.id = id;
                }

                public String getName() {
                return name;
                }

                public void setName(String name) {
                this.name = name;
                }

                @Override
                public String toString() {
                return "Person{" +
                "id=" + id +
                ", name='" + name + ''' +
                '}';
                }
                }

                }


                It reads persons from a Person table (TableA in your case), uppercase their name and writes the result in a People table (TableB in your case). Then it updates the processed flag on the Person table.



                If you run the sample, you should see:



                nbInsertedFoos in people table = 1
                nbInsertedBars in people table = 1
                nbUpdatedPersons in person table = 2


                without any dead lock exception.



                Hope this helps.






                share|improve this answer


























                  0















                  I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?




                  This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).




                  I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.




                  Here is a quick (self-contained) example with the same config as you mentioned:



                  import java.util.Arrays;
                  import javax.sql.DataSource;

                  import org.springframework.batch.core.Job;
                  import org.springframework.batch.core.JobParameters;
                  import org.springframework.batch.core.Step;
                  import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
                  import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
                  import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
                  import org.springframework.batch.core.launch.JobLauncher;
                  import org.springframework.batch.item.ItemProcessor;
                  import org.springframework.batch.item.database.JdbcBatchItemWriter;
                  import org.springframework.batch.item.database.JdbcCursorItemReader;
                  import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
                  import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
                  import org.springframework.batch.item.support.CompositeItemWriter;
                  import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
                  import org.springframework.beans.factory.annotation.Autowired;
                  import org.springframework.context.ApplicationContext;
                  import org.springframework.context.annotation.AnnotationConfigApplicationContext;
                  import org.springframework.context.annotation.Bean;
                  import org.springframework.context.annotation.Configuration;
                  import org.springframework.jdbc.core.JdbcTemplate;
                  import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
                  import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

                  @Configuration
                  @EnableBatchProcessing
                  public class MyJob {

                  @Autowired
                  private JobBuilderFactory jobs;

                  @Autowired
                  private StepBuilderFactory steps;

                  @Bean
                  public JdbcCursorItemReader<Person> itemReader() {
                  return new JdbcCursorItemReaderBuilder<Person>()
                  .name("personItemReader")
                  .dataSource(dataSource())
                  .sql("select id, name from person where processed = false")
                  .beanRowMapper(Person.class)
                  .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                  .build();
                  }

                  @Bean
                  public ItemProcessor<Person, Person> itemProcessor() {
                  return item -> new Person(item.getId(), item.getName().toUpperCase());
                  }

                  @Bean
                  public CompositeItemWriter<Person> itemWriter() {
                  return new CompositeItemWriterBuilder<Person>()
                  .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                  .ignoreItemStream(true)
                  .build();
                  }

                  @Bean
                  public JdbcBatchItemWriter<Person> peopleItemWriter() {
                  return new JdbcBatchItemWriterBuilder<Person>()
                  .dataSource(dataSource())
                  .beanMapped()
                  .sql("insert into people (name) values (:name)")
                  .build();
                  }

                  @Bean
                  public JdbcBatchItemWriter<Person> personItemUpdater() {
                  return new JdbcBatchItemWriterBuilder<Person>()
                  .dataSource(dataSource())
                  .beanMapped()
                  .sql("update person set processed = true where id = :id")
                  .build();
                  }

                  @Bean
                  public Step step() {
                  return steps.get("step")
                  .<Person, Person>chunk(1)
                  .reader(itemReader())
                  .processor(itemProcessor())
                  .writer(itemWriter())
                  .build();
                  }

                  @Bean
                  public Job job() {
                  return jobs.get("job")
                  .start(step())
                  .build();
                  }

                  @Bean
                  public DataSource dataSource() {
                  return new EmbeddedDatabaseBuilder()
                  .setType(EmbeddedDatabaseType.H2)
                  .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                  .addScript("/org/springframework/batch/core/schema-h2.sql")
                  .build();
                  }

                  @Bean
                  public JdbcTemplate jdbcTemplate(DataSource dataSource) {
                  return new JdbcTemplate(dataSource);
                  }

                  public static void main(String args) throws Exception {

                  ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
                  JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
                  jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
                  jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
                  jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
                  jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

                  JobLauncher jobLauncher = context.getBean(JobLauncher.class);
                  Job job = context.getBean(Job.class);
                  jobLauncher.run(job, new JobParameters());

                  Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
                  Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
                  System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
                  System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
                  Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
                  System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
                  }

                  public static class Person {

                  private int id;

                  private String name;

                  public Person() {
                  }

                  public Person(int id, String name) {
                  this.id = id;
                  this.name = name;
                  }

                  public int getId() {
                  return id;
                  }

                  public void setId(int id) {
                  this.id = id;
                  }

                  public String getName() {
                  return name;
                  }

                  public void setName(String name) {
                  this.name = name;
                  }

                  @Override
                  public String toString() {
                  return "Person{" +
                  "id=" + id +
                  ", name='" + name + ''' +
                  '}';
                  }
                  }

                  }


                  It reads persons from a Person table (TableA in your case), uppercase their name and writes the result in a People table (TableB in your case). Then it updates the processed flag on the Person table.



                  If you run the sample, you should see:



                  nbInsertedFoos in people table = 1
                  nbInsertedBars in people table = 1
                  nbUpdatedPersons in person table = 2


                  without any dead lock exception.



                  Hope this helps.






                  share|improve this answer
























                    0












                    0








                    0







                    I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?




                    This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).




                    I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.




                    Here is a quick (self-contained) example with the same config as you mentioned:



                    import java.util.Arrays;
                    import javax.sql.DataSource;

                    import org.springframework.batch.core.Job;
                    import org.springframework.batch.core.JobParameters;
                    import org.springframework.batch.core.Step;
                    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
                    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
                    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
                    import org.springframework.batch.core.launch.JobLauncher;
                    import org.springframework.batch.item.ItemProcessor;
                    import org.springframework.batch.item.database.JdbcBatchItemWriter;
                    import org.springframework.batch.item.database.JdbcCursorItemReader;
                    import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
                    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
                    import org.springframework.batch.item.support.CompositeItemWriter;
                    import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
                    import org.springframework.beans.factory.annotation.Autowired;
                    import org.springframework.context.ApplicationContext;
                    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
                    import org.springframework.context.annotation.Bean;
                    import org.springframework.context.annotation.Configuration;
                    import org.springframework.jdbc.core.JdbcTemplate;
                    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
                    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

                    @Configuration
                    @EnableBatchProcessing
                    public class MyJob {

                    @Autowired
                    private JobBuilderFactory jobs;

                    @Autowired
                    private StepBuilderFactory steps;

                    @Bean
                    public JdbcCursorItemReader<Person> itemReader() {
                    return new JdbcCursorItemReaderBuilder<Person>()
                    .name("personItemReader")
                    .dataSource(dataSource())
                    .sql("select id, name from person where processed = false")
                    .beanRowMapper(Person.class)
                    .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                    .build();
                    }

                    @Bean
                    public ItemProcessor<Person, Person> itemProcessor() {
                    return item -> new Person(item.getId(), item.getName().toUpperCase());
                    }

                    @Bean
                    public CompositeItemWriter<Person> itemWriter() {
                    return new CompositeItemWriterBuilder<Person>()
                    .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                    .ignoreItemStream(true)
                    .build();
                    }

                    @Bean
                    public JdbcBatchItemWriter<Person> peopleItemWriter() {
                    return new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource())
                    .beanMapped()
                    .sql("insert into people (name) values (:name)")
                    .build();
                    }

                    @Bean
                    public JdbcBatchItemWriter<Person> personItemUpdater() {
                    return new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource())
                    .beanMapped()
                    .sql("update person set processed = true where id = :id")
                    .build();
                    }

                    @Bean
                    public Step step() {
                    return steps.get("step")
                    .<Person, Person>chunk(1)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
                    }

                    @Bean
                    public Job job() {
                    return jobs.get("job")
                    .start(step())
                    .build();
                    }

                    @Bean
                    public DataSource dataSource() {
                    return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.H2)
                    .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                    .addScript("/org/springframework/batch/core/schema-h2.sql")
                    .build();
                    }

                    @Bean
                    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
                    return new JdbcTemplate(dataSource);
                    }

                    public static void main(String args) throws Exception {

                    ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
                    JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
                    jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
                    jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
                    jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
                    jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

                    JobLauncher jobLauncher = context.getBean(JobLauncher.class);
                    Job job = context.getBean(Job.class);
                    jobLauncher.run(job, new JobParameters());

                    Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
                    Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
                    System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
                    System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
                    Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
                    System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
                    }

                    public static class Person {

                    private int id;

                    private String name;

                    public Person() {
                    }

                    public Person(int id, String name) {
                    this.id = id;
                    this.name = name;
                    }

                    public int getId() {
                    return id;
                    }

                    public void setId(int id) {
                    this.id = id;
                    }

                    public String getName() {
                    return name;
                    }

                    public void setName(String name) {
                    this.name = name;
                    }

                    @Override
                    public String toString() {
                    return "Person{" +
                    "id=" + id +
                    ", name='" + name + ''' +
                    '}';
                    }
                    }

                    }


                    It reads persons from a Person table (TableA in your case), uppercase their name and writes the result in a People table (TableB in your case). Then it updates the processed flag on the Person table.



                    If you run the sample, you should see:



                    nbInsertedFoos in people table = 1
                    nbInsertedBars in people table = 1
                    nbUpdatedPersons in person table = 2


                    without any dead lock exception.



                    Hope this helps.






                    share|improve this answer













                    I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?




                    This should not cause a problem if both the read, insert and update are within the same transaction (which is the case when you use a chunk-oriented step).




                    I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.




                    Here is a quick (self-contained) example with the same config as you mentioned:



                    import java.util.Arrays;
                    import javax.sql.DataSource;

                    import org.springframework.batch.core.Job;
                    import org.springframework.batch.core.JobParameters;
                    import org.springframework.batch.core.Step;
                    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
                    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
                    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
                    import org.springframework.batch.core.launch.JobLauncher;
                    import org.springframework.batch.item.ItemProcessor;
                    import org.springframework.batch.item.database.JdbcBatchItemWriter;
                    import org.springframework.batch.item.database.JdbcCursorItemReader;
                    import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
                    import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
                    import org.springframework.batch.item.support.CompositeItemWriter;
                    import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
                    import org.springframework.beans.factory.annotation.Autowired;
                    import org.springframework.context.ApplicationContext;
                    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
                    import org.springframework.context.annotation.Bean;
                    import org.springframework.context.annotation.Configuration;
                    import org.springframework.jdbc.core.JdbcTemplate;
                    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
                    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

                    @Configuration
                    @EnableBatchProcessing
                    public class MyJob {

                    @Autowired
                    private JobBuilderFactory jobs;

                    @Autowired
                    private StepBuilderFactory steps;

                    @Bean
                    public JdbcCursorItemReader<Person> itemReader() {
                    return new JdbcCursorItemReaderBuilder<Person>()
                    .name("personItemReader")
                    .dataSource(dataSource())
                    .sql("select id, name from person where processed = false")
                    .beanRowMapper(Person.class)
                    .saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
                    .build();
                    }

                    @Bean
                    public ItemProcessor<Person, Person> itemProcessor() {
                    return item -> new Person(item.getId(), item.getName().toUpperCase());
                    }

                    @Bean
                    public CompositeItemWriter<Person> itemWriter() {
                    return new CompositeItemWriterBuilder<Person>()
                    .delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
                    .ignoreItemStream(true)
                    .build();
                    }

                    @Bean
                    public JdbcBatchItemWriter<Person> peopleItemWriter() {
                    return new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource())
                    .beanMapped()
                    .sql("insert into people (name) values (:name)")
                    .build();
                    }

                    @Bean
                    public JdbcBatchItemWriter<Person> personItemUpdater() {
                    return new JdbcBatchItemWriterBuilder<Person>()
                    .dataSource(dataSource())
                    .beanMapped()
                    .sql("update person set processed = true where id = :id")
                    .build();
                    }

                    @Bean
                    public Step step() {
                    return steps.get("step")
                    .<Person, Person>chunk(1)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
                    }

                    @Bean
                    public Job job() {
                    return jobs.get("job")
                    .start(step())
                    .build();
                    }

                    @Bean
                    public DataSource dataSource() {
                    return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.H2)
                    .addScript("/org/springframework/batch/core/schema-drop-h2.sql")
                    .addScript("/org/springframework/batch/core/schema-h2.sql")
                    .build();
                    }

                    @Bean
                    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
                    return new JdbcTemplate(dataSource);
                    }

                    public static void main(String args) throws Exception {

                    ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
                    JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
                    jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
                    jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
                    jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
                    jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");

                    JobLauncher jobLauncher = context.getBean(JobLauncher.class);
                    Job job = context.getBean(Job.class);
                    jobLauncher.run(job, new JobParameters());

                    Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
                    Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
                    System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
                    System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
                    Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
                    System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
                    }

                    public static class Person {

                    private int id;

                    private String name;

                    public Person() {
                    }

                    public Person(int id, String name) {
                    this.id = id;
                    this.name = name;
                    }

                    public int getId() {
                    return id;
                    }

                    public void setId(int id) {
                    this.id = id;
                    }

                    public String getName() {
                    return name;
                    }

                    public void setName(String name) {
                    this.name = name;
                    }

                    @Override
                    public String toString() {
                    return "Person{" +
                    "id=" + id +
                    ", name='" + name + ''' +
                    '}';
                    }
                    }

                    }


                    It reads persons from a Person table (TableA in your case), uppercase their name and writes the result in a People table (TableB in your case). Then it updates the processed flag on the Person table.



                    If you run the sample, you should see:



                    nbInsertedFoos in people table = 1
                    nbInsertedBars in people table = 1
                    nbUpdatedPersons in person table = 2


                    without any dead lock exception.



                    Hope this helps.







                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Nov 21 at 10:09









                    Mahmoud Ben Hassine

                    3,6451714




                    3,6451714

























                        0














                        I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
                        See the example below.



                        -- *** Example of queue processing in DB2 ***
                        -- The following registry variables must be set:
                        -- DB2_SKIPINSERTED=YES
                        -- DB2_SKIPDELETED=YES
                        -- DB2_EVALUNCOMMITTED=YES
                        -- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

                        create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
                        -- 'exclude null keys' is avaiable starting from V10.5
                        create index test1 on test(not_processed) exclude null keys;
                        alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
                        insert into test (id) values 1,2;

                        -- Every session starts its transaction with locking its own set of rows (only one in the example),
                        -- which becomes invisible for the same statement issued by other concurrent transactions
                        -- due to setting registry variables above.
                        -- No lock waits expected on such an update.
                        update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
                        -- work with other tables comes below
                        -- ...
                        -- transaction end





                        share|improve this answer




























                          0














                          I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
                          See the example below.



                          -- *** Example of queue processing in DB2 ***
                          -- The following registry variables must be set:
                          -- DB2_SKIPINSERTED=YES
                          -- DB2_SKIPDELETED=YES
                          -- DB2_EVALUNCOMMITTED=YES
                          -- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

                          create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
                          -- 'exclude null keys' is avaiable starting from V10.5
                          create index test1 on test(not_processed) exclude null keys;
                          alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
                          insert into test (id) values 1,2;

                          -- Every session starts its transaction with locking its own set of rows (only one in the example),
                          -- which becomes invisible for the same statement issued by other concurrent transactions
                          -- due to setting registry variables above.
                          -- No lock waits expected on such an update.
                          update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
                          -- work with other tables comes below
                          -- ...
                          -- transaction end





                          share|improve this answer


























                            0












                            0








                            0






                            I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
                            See the example below.



                            -- *** Example of queue processing in DB2 ***
                            -- The following registry variables must be set:
                            -- DB2_SKIPINSERTED=YES
                            -- DB2_SKIPDELETED=YES
                            -- DB2_EVALUNCOMMITTED=YES
                            -- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

                            create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
                            -- 'exclude null keys' is avaiable starting from V10.5
                            create index test1 on test(not_processed) exclude null keys;
                            alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
                            insert into test (id) values 1,2;

                            -- Every session starts its transaction with locking its own set of rows (only one in the example),
                            -- which becomes invisible for the same statement issued by other concurrent transactions
                            -- due to setting registry variables above.
                            -- No lock waits expected on such an update.
                            update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
                            -- work with other tables comes below
                            -- ...
                            -- transaction end





                            share|improve this answer














                            I'd suggest you redesign the transaction logic to "lock" necessary TABLEA's rows marking them as 'PROCESSED' at the very beginning of your transaction, and not update them one more time at the end of the transaction.
                            See the example below.



                            -- *** Example of queue processing in DB2 ***
                            -- The following registry variables must be set:
                            -- DB2_SKIPINSERTED=YES
                            -- DB2_SKIPDELETED=YES
                            -- DB2_EVALUNCOMMITTED=YES
                            -- Don't forget to db2stop/db2start after their setting to make the changes take an effect.

                            create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
                            -- 'exclude null keys' is avaiable starting from V10.5
                            create index test1 on test(not_processed) exclude null keys;
                            alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
                            insert into test (id) values 1,2;

                            -- Every session starts its transaction with locking its own set of rows (only one in the example),
                            -- which becomes invisible for the same statement issued by other concurrent transactions
                            -- due to setting registry variables above.
                            -- No lock waits expected on such an update.
                            update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
                            -- work with other tables comes below
                            -- ...
                            -- transaction end






                            share|improve this answer














                            share|improve this answer



                            share|improve this answer








                            edited Nov 21 at 14:33

























                            answered Nov 21 at 7:58









                            Mark Barinstein

                            99414




                            99414























                                -1














                                The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...



                                If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.






                                share|improve this answer


























                                  -1














                                  The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...



                                  If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.






                                  share|improve this answer
























                                    -1












                                    -1








                                    -1






                                    The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...



                                    If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.






                                    share|improve this answer












                                    The architecure is ETL like reading data from a source, processing it and writing it to a target. I try to avoid this update logic in my processes as it introduces a big overhead and the problems you described. So maybe you could re-think the architecture ...



                                    If not I really recommend to have a appropriate index for the update - depending on the search condition you use. This will make the update not only cheaper but the SQL will only need to access the one row - avoiding additional table scans for the update.







                                    share|improve this answer












                                    share|improve this answer



                                    share|improve this answer










                                    answered Nov 20 at 20:55









                                    MichaelTiefenbacher

                                    2,1392614




                                    2,1392614






























                                        draft saved

                                        draft discarded




















































                                        Thanks for contributing an answer to Stack Overflow!


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid



                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.


                                        To learn more, see our tips on writing great answers.





                                        Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


                                        Please pay close attention to the following guidance:


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid



                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.


                                        To learn more, see our tips on writing great answers.




                                        draft saved


                                        draft discarded














                                        StackExchange.ready(
                                        function () {
                                        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53401188%2fspring-batch-getting-a-deadlockloserdataaccessexception-when-trying-to-read-wr%23new-answer', 'question_page');
                                        }
                                        );

                                        Post as a guest















                                        Required, but never shown





















































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown

































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown







                                        Popular posts from this blog

                                        404 Error Contact Form 7 ajax form submitting

                                        How to know if a Active Directory user can login interactively

                                        TypeError: fit_transform() missing 1 required positional argument: 'X'