Templates by BIGtheme NET

Multithreading with spring jdbc for bulk operations

This could be common requirement for most of the projects to perform bulk database operations.
In this Article, I will show How to do bulk database operations like update, insert, delete
in efficient way by using batch processing and multithreading with help of Spring jdbc template.

When to Use ?
– When there is a need to improve the performance of database queries,
that are executing on database containing millions of records.

Highly recommended for bigger databases, but optional for smaller datasets.

Tools and technologies Uses :

1) spring-core – 3.2.3.RELEASE
2) spring-context – 3.2.3.RELEASE
3) spring-jdbc – 3.2.3.RELEASE
4) eclipse version Luna 4.4.1.
5) Maven 4.0.0
6) mysql – 5.1.26
7) JDK 1.6 or above (recommended is Java 8)

Prerequisites:
1) Mysql database should be up and running.
2) Database Schema should be created with name “test”

Schema

Steps to be followed :

1) Create a maven project in eclipse.
2) Add spring-core.jar, spring-context.jar, spring-jdbc.jar, commons-lang3.jar
and mysql-connector-java.Jar file dependencies in pom.xml file.
3) Write a simple java program.
4) Demo

Here is the maven dependencies that needs to be added to pom.xml file :

<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-core</artifactId>
	<version>${spring.version}</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-context</artifactId>
	<version>${spring.version}</version>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.26</version>
</dependency>
<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jdbc</artifactId>
	<version>${spring.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.commons</groupId>
	<artifactId>commons-lang3</artifactId>
	<version>3.0</version>
</dependency>

Here is spring configuration file applicationContext.xml :

This xml file having all spring bean configuration files,
applicationContext.xml
spring-context

Write a simple java program :

SpringJdbcBulkDbOperations.java
This is the mail class that trigger the process,
1) Create Multiple threads with fixed thread pool.

// Start fixed thread pool size of 4
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(fixedThreadPool);

2) Call the batching so that batch will be ready.

// do batching
jdbcTemplate = getJdbcTemplate();
BatchConversion batchConversion = new BatchConversion(jdbcTemplate, completionService);
jdbcTemplate.query("select id from Customer where upgraded is null", batchConversion);
if(!batchConversion.getIds().isEmpty()){
	completionService.submit(new CustomerUpdateCallable(jdbcTemplate, batchConversion.getIds()));
}

3) Execute the batch.

//execute the batch
long totalCustomersUpdated = 0;
for (int i = 0; i < batchConversion.getBatchCount(); i++) {
	try {
		Future<Integer> batchCount = completionService.take();
		if (batchCount != null) {
			int batchCountValue = batchCount.get();
			totalCustomersUpdated = totalCustomersUpdated + batchCountValue;
			System.out.println("Customer updated count: " + batchCountValue + ", total count: " + totalCustomersUpdated);
		}
	} catch (Exception e) {
		e.printStackTrace();
	}
}

The process flow or code flow:
ProcessFlow

Complete code is here,

package com.devjavasource.jdbc.jdbctemplate;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ResultSetExtractor;

public class SpringJdbcBulkDbOperations {
    private static JdbcTemplate jdbcTemplate;
    public static void main(String[] args) throws SQLException {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");        
        SpringJdbcBulkDbOperations stmtQueryExample = (SpringJdbcBulkDbOperations) context.getBean("springBulkUpdateExample");             
        stmtQueryExample.queryEmployee();
        
        //do bulk update
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4); // Start fixed thread pool size of 4
        try{
        	CompletionService<Integer> completionService = new ExecutorCompletionService<>(fixedThreadPool);
        	
        	// do batching
        	jdbcTemplate = getJdbcTemplate();
        	BatchConversion batchConversion = new BatchConversion(jdbcTemplate, completionService);
        	jdbcTemplate.query("select id from Customer where upgraded is null", batchConversion);
        	if(!batchConversion.getIds().isEmpty()){
        		completionService.submit(new CustomerUpdateCallable(jdbcTemplate, batchConversion.getIds()));
        	}
        	
        	//execute the batch
        	long totalCustomersUpdated = 0;
            for (int i = 0; i < batchConversion.getBatchCount(); i++) {
                try {
                    Future<Integer> batchCount = completionService.take();
                    if (batchCount != null) {
                        int batchCountValue = batchCount.get();
                        totalCustomersUpdated = totalCustomersUpdated + batchCountValue;
                        System.out.println("Customer updated count: " + batchCountValue + ", total count: " + totalCustomersUpdated);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        	
        }catch(Exception exp){
        	exp.printStackTrace();
        }finally {
            fixedThreadPool.shutdownNow();
        }
    }

    public void queryEmployee() throws SQLException { 
        String sql = "SELECT ID, NAME, AGE, ADRESS, UPGRADED FROM CUSTOMER";             
        
        List<Customer> empList = getJdbcTemplate().query(sql, new ResultSetExtractor<List<Customer>>(){

            public List<Customer> extractData(ResultSet rs) throws SQLException,
                    DataAccessException {
                List<Customer> empList = new ArrayList<Customer>();
                while(rs.next()) {
                	Customer emp = new Customer(rs.getInt("ID"), rs.getString("NAME"), rs.getInt("AGE"), rs.getString("ADRESS"), rs.getString("UPGRADED"));                    
                    empList.add(emp);
                }
                return empList;
            }});
        
        System.out.println(empList);        
    }

    public static JdbcTemplate getJdbcTemplate() {
        return jdbcTemplate;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }
}

CustomerUpdateCallable.java
Is the custom callable class, that implements Callable interface.
Here we are overriding the call() method and implement the code to get list of the customers
that need to be updated and pass this list to updateCustomers method of UnitBatchUpdate class.

package com.devjavasource.jdbc.jdbctemplate;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.apache.commons.lang3.StringUtils;
import org.springframework.jdbc.core.JdbcTemplate;

public class CustomerUpdateCallable implements Callable<Integer> {
	CustomerUpdateCallable(JdbcTemplate inJt, List<Serializable> inIds) {
		_jt = inJt;
		_Ids = inIds;
	}

	@Override
	public Integer call() {
		List<Map<String, Object>> results = _jt.queryForList(SELECT_CUST_SQL + StringUtils.join(_Ids, ',') + ')');
		UnitBatchUpdate.updateCustomers(_jt, results);
		return _Ids.size();
	}

	private List<Serializable> _Ids;
	private JdbcTemplate _jt;
	private static final String SELECT_CUST_SQL = "SELECT id, name, age, adress FROM Customer WHERE id in (";
}

UnitBatchUpdate.java
This class is having actual update logic of the customer.

package com.devjavasource.jdbc.jdbctemplate;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;

import org.springframework.jdbc.core.JdbcTemplate;

public class UnitBatchUpdate {

	private UnitBatchUpdate() {	}

	public static void updateCustomers(JdbcTemplate inJt,
			List<Map<String, Object>> inPrimaryCustomerList) {
		if (inPrimaryCustomerList == null || inPrimaryCustomerList.isEmpty()) {
			return;
		}

		System.out.println("Start Updating the Customer Details..." + inPrimaryCustomerList );

		List<Object[]> customerArgsList = new ArrayList<>();
		List<Object[]> propertySourceArgsList = new ArrayList<>();
		List<Object> custIdList = new ArrayList<>();

		for (Map<String, Object> custRow : inPrimaryCustomerList) {
			Serializable custId = (Serializable) custRow.get("id");
			createBatchArgs(custRow, custId, customerArgsList, propertySourceArgsList, custIdList);
		}

		inJt.batchUpdate(UPDATE_CUSTOMER_SQL, customerArgsList);		

		System.out.println("Done !!!!!!!!");
		
		setUnitEquipmentBatchAsUpgraded(inJt, custIdList);
	}

	private static void createBatchArgs(Map<String, Object> inCustomerRow,
			Serializable inCustomerId, List<Object[]> inOutCustArgsList,
			List<Object[]> inOutPropertySourceArgsList,
			List<Object> inOutCustIdList) {

		Object id = inCustomerRow.get("id");
		inOutCustIdList.add(id);

		Object[] custArgs = new Object[3];
		custArgs[0] = inCustomerRow.get("name");
		custArgs[1] = inCustomerRow.get("adress");
		custArgs[2] = inCustomerId;

		inOutCustArgsList.add(custArgs);

		Object[] propertySourceArgs = new Object[2];
		propertySourceArgs[0] = inCustomerId;
		propertySourceArgs[1] = id;

		inOutPropertySourceArgsList.add(propertySourceArgs);
	}
	
	private static void setUnitEquipmentBatchAsUpgraded(JdbcTemplate inJt, List<Object> inCustIdList) {

		if (inCustIdList != null && !inCustIdList.isEmpty()) {
            try {
            	System.out.println("Mark 'upgraded' to true for the Customer rows that are upgraded");
                String updateAsUpgraded = String.format(UPDATE_CUSTOMER_CONVERTED_SQL, '1', StringUtils.join(inCustIdList, ','));
                System.out.println("Update Customer as upgraded SQL: " + updateAsUpgraded);
                inJt.execute(updateAsUpgraded);
                System.out.println("Customer rows marked as 'Upgraded'");
            } catch(Exception e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("Null/empty UeGkeyList param passed to %s.setUnitEquipmentBatchAsUpgraded()" + UnitBatchUpdate.class.getSimpleName());
        }
	}

	private static final String UPDATE_CUSTOMER_SQL = "UPDATE Customer SET name=?, adress=? WHERE id = ?";
	private static final String UPDATE_CUSTOMER_CONVERTED_SQL = "UPDATE Customer SET upgraded = %s WHERE id in (%s)";
}

BatchConversion.java
This is Custom class implements sprint jdbc interface RowCallbackHandler.
Here, we Override processRow() method to implement the logic,
to submit the batch as and when the batch size reach the size that we defined.
Here in our case, it is 3. This can be changed depends on our requirement.

We call the submit method of java.util.concurrent.CompletionService class,
just by passing the instance of CustomerUpdateCallable class.

As we know, CompletionService is an interface. This will execute in separate thread.
So, As we have 10 customers and updating the information of all these customers and 3 customers as one batch.
Each batch will be executed by one thread.

package com.devjavasource.jdbc.jdbctemplate;

import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;

import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;

public class BatchConversion implements RowCallbackHandler{
	@SuppressWarnings("unchecked")
	BatchConversion(JdbcTemplate inJt, @SuppressWarnings("rawtypes") CompletionService inCompletionService) {
		_jt = inJt;
		_completionService = inCompletionService;
	}

	@Override
	public void processRow(ResultSet inRs) throws SQLException {
		_ids.add((Serializable) inRs.getObject(1));
		if (_ids.size() == _batchSize) {
			_completionService.submit(new CustomerUpdateCallable(_jt, new ArrayList<>(_ids)));
			_ids.clear();
			_batchCount++;
		}
	}

	public int getBatchCount() {
		return _batchCount;
	}

	private int _batchCount;

	public List<Serializable> getIds() {
		return _ids;
	}

	private List<Serializable> _ids = new ArrayList<>();
	private JdbcTemplate _jt;
	private CompletionService<Integer> _completionService;
	private static int _batchSize = 3;
}

Customer.java
Is the Entity class with properties id, name, age, address and upgraded, that can holds Customer object values.

package com.devjavasource.jdbc.jdbctemplate;

public class Customer {
    private int id;
    private String name;
    private int age;
    private String address;
    private String upgraded;

    public Customer(int id, String name, int age, String address, String upgraded) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.address = address;
        this.upgraded = upgraded;
    }

    public int getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }
    
    public String getAddress() {
		return address;
	}

	public String getUpgraded() {
		return upgraded;
	}

	public void setUpgraded(String upgraded) {
		this.upgraded = upgraded;
	}

	public String toString() {
        return "Customer: [" + name + ", ID: " + id + ", AGE " + age + ", ADDRESS: "+ address +", UPGRADED: "+ upgraded +"]";
    }
}

Demo:
Right click on SpringJdbcBulkDbOperations.java and Run As -> Java Application.

Out Put :

[Customer: [Scott, ID: 1, AGE 20, ADDRESS: 22/16, Street 2nd, Lane 2, OAK, UPGRADED: null], Customer: [Ron, ID: 2, AGE 25, ADDRESS: 16/13, Street 4nd, Lane 1, OAK, UPGRADED: null], Customer: [Ben, ID: 3, AGE 30, ADDRESS: 11/16, Street 6nd, Lane 4, CA, UPGRADED: null], Customer: [Jhon, ID: 4, AGE 35, ADDRESS: 16/13, Street 4nd, Lane 1, CA, UPGRADED: null], Customer: [Williams, ID: 5, AGE 20, ADDRESS: 1/16, Street 2nd, Lane 2, AUS, UPGRADED: null], Customer: [Clenny, ID: 6, AGE 25, ADDRESS: 6/13, Street 7nd, Lane 1, OAK, UPGRADED: null], Customer: [Doom, ID: 7, AGE 40, ADDRESS: 22/16, Street 2nd, Lane 2, NEW, UPGRADED: null], Customer: [Blash, ID: 8, AGE 15, ADDRESS: 16/13, Street 4nd, Lane 1, LON, UPGRADED: null], Customer: [Kent, ID: 9, AGE 40, ADDRESS: 22/16, Street 2nd, Lane 2, SWD, UPGRADED: null], Customer: [Albert, ID: 10, AGE 45, ADDRESS: 16/13, Street 4nd, Lane 1, UK, UPGRADED: null]]
Start Updateing the Customer Details...[{id=7, name=Doom, age=40, adress=22/16, Street 2nd, Lane 2, NEW}, {id=8, name=Blash, age=15, adress=16/13, Street 4nd, Lane 1, LON}, {id=9, name=Kent, age=40, adress=22/16, Street 2nd, Lane 2, SWD}]
Start Updateing the Customer Details...[{id=4, name=Jhon, age=35, adress=16/13, Street 4nd, Lane 1, CA}, {id=5, name=Williams, age=20, adress=1/16, Street 2nd, Lane 2, AUS}, {id=6, name=Clenny, age=25, adress=6/13, Street 7nd, Lane 1, OAK}]
Start Updateing the Customer Details...[{id=1, name=Scott, age=20, adress=22/16, Street 2nd, Lane 2, OAK}, {id=2, name=Ron, age=25, adress=16/13, Street 4nd, Lane 1, OAK}, {id=3, name=Ben, age=30, adress=11/16, Street 6nd, Lane 4, CA}]

-- Executing done by tHread 1
Start Updating the Customer Details...[{id=10, name=Albert, age=45, adress=16/13, Street 4nd, Lane 1, UK}]
Done !!!!!!!!
Mark 'upgraded' to true for the Customer rows that are upgraded
Update Customer as upgraded SQL: UPDATE Customer SET upgraded = 1 WHERE id in (10)
Done !!!!!!!!

-- Executing done by tHread 2
Mark 'upgraded' to true for the Customer rows that are upgraded
Update Customer as upgraded SQL: UPDATE Customer SET upgraded = 1 WHERE id in (7,8,9)
Done !!!!!!!!

-- Executing done by tHread 3
Mark 'upgraded' to true for the Customer rows that are upgraded
Update Customer as upgraded SQL: UPDATE Customer SET upgraded = 1 WHERE id in (1,2,3)
Done !!!!!!!!

-- Executing done by tHread 4
Mark 'upgraded' to true for the Customer rows that are upgraded
Update Customer as upgraded SQL: UPDATE Customer SET upgraded = 1 WHERE id in (4,5,6)

Customer rows marked as 'Upgraded'
Customer rows marked as 'Upgraded'
Customer updated count: 1, total count: 1
Customer updated count: 3, total count: 4
Customer rows marked as 'Upgraded'
Customer updated count: 3, total count: 7
Customer rows marked as 'Upgraded'

You can download the complete project, Here

bulkDbOperations

*** Venkat – Happy learning ****