返回顶部

收藏

使用spring动态路由切换主从库

更多

最近对动态路由进行了学习,现总结如下:

通过集成org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource类,自定义动态数据源。

配置如下: datasource-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
http://www.springframework.org/schema/context  
 http://www.springframework.org/schema/context/spring-context-2.5.xsd  
http://www.springframework.org/schema/aop   
http://www.springframework.org/schema/aop/spring-aop-2.5.xsd  
http://www.springframework.org/schema/tx   
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

        <!-- 数据源配置 -->
    <bean id="dataSourceFirst" class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
        <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
        <property name="url" value="jdbc:oracle:thin:@10.20.151.4:1521:ptdev" />
        <property name="username" value="pt" />
        <property name="password" value="pt" />
        <property name="maxActive" value="200" />
        <property name="maxIdle" value="5" />
        <property name="poolPreparedStatements" value="true" />
        <property name="removeAbandoned" value="true" />
        <property name="removeAbandonedTimeout" value="300" />
    </bean>

    <bean id="dataSourceSecond" class="org.apache.commons.dbcp.BasicDataSource"
        destroy-method="close">
        <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver" />
        <property name="url" value="jdbc:oracle:thin:@10.20.151.12:1521:pt10g" />
        <property name="username" value="pt" />
        <property name="password" value="pt" />
        <property name="maxActive" value="200" />
        <property name="maxIdle" value="5" />
        <property name="poolPreparedStatements" value="true" />
        <property name="removeAbandoned" value="true" />
        <property name="removeAbandonedTimeout" value="300" />
    </bean>

    <bean id="dataSource" class="com.common.bean.RoutingDataSource">
        <property name="targetDataSources">
            <map>
                <entry key="1" value-ref="dataSourceFirst" />
                <entry key="2" value-ref="dataSourceSecond" />
            </map>
        </property>
        <property name="defaultTargetDataSource">
            <ref local="dataSourceFirst" />
        </property>
    </bean>
<!--配置事物-->
    <bean id="transactionManager"
        class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource">
            <ref local="dataSource" />
        </property>
    </bean>

    <bean id="lobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"
        lazy-init="true" />

    <bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <property name="lobHandler" ref="lobHandler" />
        <property name="configLocations" value="classpath*:/ibatis/config/sql-map.xml" />
    </bean>

    <bean id="txAttributeSource"
        class="org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource">
        <property name="properties">
            <props>
                <prop key="add*">PROPAGATION_REQUIRED,-PtServiceException</prop>
                <prop key="update*">PROPAGATION_REQUIRED,-PtServiceException</prop>
                <prop key="delete*">PROPAGATION_REQUIRED,-PtServiceException</prop>
                <prop key="batch*">PROPAGATION_REQUIRED,-PtServiceException</prop>
                <prop key="get*">PROPAGATION_REQUIRED,-PtServiceException</prop>
            </props>
        </property>
    </bean>

    <bean id="transactionManagerProxy" class="org.springframework.aop.framework.ProxyFactoryBean">
        <property name="proxyTargetClass">
            <value>true</value>
        </property>
        <property name="target">
            <ref bean="transactionManager" />
        </property>
    </bean>

    <bean id="transactionDefinition"
        class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean"
        abstract="true">
        <property name="transactionManager">
            <ref bean="transactionManagerProxy" />
        </property>
        <property name="transactionAttributeSource">
            <ref bean="txAttributeSource" />
        </property>
    </bean>

    <bean id="baseDao" class="com.common.dao.impl.BaseDaoImpl">
        <property name="sqlMapClient">
            <ref bean="sqlMapClient" />
        </property>
        <property name="dataSource">
            <ref bean="dataSource" />
        </property>
    </bean>

    <bean id="dbInfoService" parent="transactionDefinition">
        <property name="target">
            <bean class="com.service.impl.DbInfoServiceImpl">
                <property name="baseDao" ref="baseDao" />
            </bean>
        </property>
    </bean>

    <bean id="test" class="com.common.bean.Test">
        <property name="dbInfoService" ref="dbInfoService"></property>
    </bean>

</beans>

com.common.bean.RoutingDataSource类:

package com.common.bean;

import java.sql.Connection;
import java.sql.SQLException;

import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

public class RoutingDataSource extends AbstractRoutingDataSource {

    protected Object determineCurrentLookupKey() {
        //获取当前线程处理的账号对应分片信息
        Shard shard = ThreadInfoHolder.getCurrentThreadShard();

        //动态选定DataSource
        String dbId = shard == null ? null : String.valueOf(shard.getDbId());

        return dbId;
    }

    @Override
    public String toString() {
        //获取当前线程处理的账号对应分片信息
        Shard shard = ThreadInfoHolder.getCurrentThreadShard();

        //动态选定DataSource
        String dbId = shard == null ? null : String.valueOf(shard.getDbId());

        return "DB ID " + dbId + ":" + super.toString();
    }

    public String getTargetDbId() throws SQLException {
        Connection conn = null;
        try {
            //jdbc:oracle:thin:@10.20.151.4:1521:ptdev, UserName=xx, Oracle JDBC driver
            conn = determineTargetDataSource().getConnection();

            if (conn != null) {
                String connectionDesc = conn.getMetaData().getURL();
                int beginIdx = connectionDesc.indexOf("@") + 1;
                int endIdx = connectionDesc.indexOf(":", beginIdx);

                return connectionDesc.substring(beginIdx, endIdx);
            }
        } finally {
            if (conn != null) {
                conn.close();
            }
        }

        return null;
    }

}

com.common.bean.ThreadInfoHolder类如下:
package com.common.bean;

public class ThreadInfoHolder {
     // thread local, 获取、存储本线程处理的账号对应分片信息
    private static final ThreadLocal<Shard> shardLocal = new ThreadLocal<Shard>();

    /**
     * 获取当前线程处理的账号对应分片信息
     * 
     * @return
     */
    public static Shard getCurrentThreadShard() {
        return ThreadInfoHolder.shardLocal.get();
    }

    /**
     * 在当前线程存储账号对应分片信息
     * 
     * @param shard
     */
    public static void addCurrentThreadShard(Shard shard) {
        ThreadInfoHolder.shardLocal.set(shard);
    }

    /**
     * 清空前线程存储分片信息
     * 
     * @param shard
     */
    public static void cleanCurrentThreadShard() {
        ThreadInfoHolder.shardLocal.remove();
    }

}
com.common.bean.Shard类如下:
public class Shard {
//存放Account数据的DB_ID
    private Integer            dbId;
 /**
     * @return the dbId
     */
    public Integer getDbId() {
        return dbId;
    }

    /**
     * @param dbId the dbId to set
     */
    public void setDbId(Integer dbId) {
        this.dbId = dbId;
    }
}

com.service.DbInfoService接口:
package com.service;

import java.util.List;
import java.util.Map;

import com.common.dao.model.User;

public interface DbInfoService {

    public List<Map<String,Object>> getUserInfo(User user);

}

com.service.impl.DbInfoServiceImpl实现类:
package com.service.impl;

import java.util.List;
import java.util.Map;

import com.common.bean.Shard;
import com.common.bean.ThreadInfoHolder;
import com.common.dao.BaseDao;
import com.common.dao.model.User;
import com.service.DbInfoService;

public class DbInfoServiceImpl implements DbInfoService{

    public BaseDao baseDao;

    public void setBaseDao(BaseDao baseDao) {
        this.baseDao = baseDao;
    }

    public List<Map<String,Object>> getUserInfo(User user)
    {

        baseDao.add("login.addUser", user);
        List<Map<String,Object>> result=baseDao.getList("login.getUserInfo",user.getName());

        return result;
    }

}

com.common.bean.Test类:
package com.common.bean;

import java.util.List;
import java.util.Map;

import com.common.dao.model.User;
import com.service.DbInfoService;

public class Test {

    public DbInfoService dbInfoService;

    public void setDbInfoService(DbInfoService dbInfoService) {
        this.dbInfoService = dbInfoService;
    }

    public List<Map<String,Object>> getInfo(User user)
    {
        List<Map<String,Object>> result=dbInfoService.getUserInfo(user);
                return result;
    }

}

测试main方法如下:

package com.transaction;

import java.util.List;
import java.util.Map;

import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.io.ClassPathResource;

import com.common.bean.Shard;
import com.common.bean.Test;
import com.common.bean.ThreadInfoHolder;
import com.common.dao.model.User;
import com.service.impl.DbInfoServiceImpl;

public class TransactionTest {

    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/datasource-config.xml");
                Test t=(Test)ctx.getBean("test");
        User user=new User();
        user.setName("xj");
        user.setPassword("123");
        Shard shard=new Shard();
        shard.setDbId(1);//使用datasource1
        ThreadInfoHolder.addCurrentThreadShard(shard);
        List<Map<String,Object>> result =t.getInfo(user);
        shard.setDbId(2);//使用datasource2
        ThreadInfoHolder.addCurrentThreadShard(shard);
        List<Map<String,Object>> result1 =t.getInfo(user);
        System.out.println(result);
    }

}

运行结果是分别向数据库1和数据库2中插入了1调记录。

注意:

由于对DbInfoService配置了事物,如果将切换数据源的代码ThreadInfoHolder.addCurrentThreadShard(shard);放在在DbInfoServiceImpl类的getUserInfo方法中,如下:

public List<Map<String,Object>> getUserInfo(User user)
{
    Shard shard=new Shard();
    shard.setDbId(1);
    ThreadInfoHolder.addCurrentThreadShard(shard);
    baseDao.add("login.addUser", user);

    shard.setDbId(2);
    ThreadInfoHolder.addCurrentThreadShard(shard);
    baseDao.add("login.addUser", user);
    List<Map<String,Object>> result=baseDao.getList("login.getUserInfo",user.getName());

    return result;
}

main方法改为:

public static void main(String[] args) {
    ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath*:spring/datasource-config.xml");
    Test t=(Test)ctx.getBean("test");
    User user=new User();
    user.setName("xj");
    user.setPassword("123");
    List<Map<String,Object>> result =t.getInfo(user);
    System.out.println(result);
}

则运行结果将是向数据库1中插入2条相同的记录,而不是分别想数据库1,2各插一条记录,产生该结果的原因是应为由于DbInfoServiceImpl配置了事物,所以在getUserInfo方法中的第一次连数据库会新建一个连接,而后将该连接绑定在线程的本地变量即ThreadLoad中,当以后在需要访问数据库时不在新建连接而是使用这个绑定了老连接,在本例子中,即第一次的数据库连接是连数据库1,当第二次访问数据库时,使用的还是这个数据库1的连接,即切换数据源设置代码shard.setDbId(2);ThreadInfoHolder.addCurrentThreadShard(shard);失效。同理我们可以退出,如果将切换数据源代码放在Test类的getInfo方法中,即:

public List<Map<String,Object>> getInfo(User user)
{
    Shard shard=new Shard();
    shard.setDbId(1);
    ThreadInfoHolder.addCurrentThreadShard(shard);
    List<Map<String,Object>> result=dbInfoService.getUserInfo(user);
    shard.setDbId(2);
    ThreadInfoHolder.addCurrentThreadShard(shard);
    List<Map<String,Object>> result1=dbInfoService.getUserInfo(user);

    return result;
}

这样是能正确运行的,应为在调用事物前我们已经切换了数据源。

标签:Spring,Mysql,AbstractRoutingDataSource

收藏

0人收藏

支持

1

反对

0

评论

  • 按上述,那么两个就不在一个事务里了 一个失败另一个不能回滚,请问这该如何解决?

    回复   |   hi_kevin 发表于 2013-08-15 17:05:07
  • 这个应该可以使用分布式事务的

    回复   |   金背二郎 发表于 2013-08-20 09:11:14

发表评论