Skip to content

企业切换

DATASOURCE模式

DATASOURCE模式是指每个租户对应一个数据库,应用启动时,会将所有租户的数据源加载,执行数据库操作时,动态切换数据源到当前租户的数据源,并执行SQL。

COLUMN模式是指所有租户共用一个数据库,在表中增加租户ID字段区分。

DATASOURCE 模式 实现原理解析

  1. 配置默认数据源
yaml
# database.yml
acuity:
  druid-mysql: &druid-mysql
    validation-query: SELECT 'x'
  mysql: &db-mysql
    username: 'root'
    password: 'root'
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/acuity_ds_c_defaults?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useUnicode=true&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&nullCatalogMeansCurrent=true
  database:  
    # COLUMN模式中隔离 租户 的列名  oracle数据库initDatabasePrefix不能超过11个字符
    initDatabasePrefix:
      - acuity_pro_base

spring:
  autoconfigure:
    exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
  datasource:
    dynamic:
      enabled: true
      p6spy: ${acuity.database.p6spy:false}
      seata: ${acuity.database.isSeata:false}
      # 设置默认数据源为 0
      primary: "0"
      datasource:
        "0":
          # 锚点引用,会将上方 &db-mysql 的配置,连接到这里
          <<: *db-mysql
      druid:
# 锚点引用,会将上方 &druid-mysql 的配置,连接到这里
        <<: *druid-mysql
        initialSize: 10
        minIdle: 10
        maxActive: 200
        max-wait: 60000
        pool-prepared-statements: true
        max-pool-prepared-statement-per-connection-size: 20
        test-on-borrow: false
        test-on-return: false
        test-while-idle: true
        time-between-eviction-runs-millis: 60000  #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        min-evictable-idle-time-millis: 300000    #配置一个连接在池中最小生存的时间,单位是毫秒
        filters: stat,wall
        wall:
          enabled: true
          strictSyntaxCheck: false
          comment-allow: true
          multiStatementAllow: true
          noneBaseStatementAllow: true
    druid:
      enable: true
      # 以下的2段配置,同时适用于所有模式
      web-stat-filter:  # WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
        enabled: true
        url-pattern: /*
        exclusions: "*.js , *.gif ,*.jpg ,*.png ,*.css ,*.ico , /druid/*"
        session-stat-max-count: 1000
        profile-enable: true
        session-stat-enable: false
      stat-view-servlet:  #展示Druid的统计信息,StatViewServlet的用途包括:1.提供监控信息展示的html页面2.提供监控信息的JSON API
        enabled: true
        url-pattern: /druid/*   #根据配置中的url-pattern来访问内置监控页面,如果是上面的配置,内置监控页面的首页是/druid/index.html例如:http://127.0.0.1:9000/druid/index.html
        reset-enable: true    #允许清空统计数据
        login-username: ''
        login-password: ''
        # 允许访问IP
        allow: ''
# database.yml
acuity:
  druid-mysql: &druid-mysql
    validation-query: SELECT 'x'
  mysql: &db-mysql
    username: 'root'
    password: 'root'
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/acuity_ds_c_defaults?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useUnicode=true&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&nullCatalogMeansCurrent=true
  database:  
    # COLUMN模式中隔离 租户 的列名  oracle数据库initDatabasePrefix不能超过11个字符
    initDatabasePrefix:
      - acuity_pro_base

spring:
  autoconfigure:
    exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
  datasource:
    dynamic:
      enabled: true
      p6spy: ${acuity.database.p6spy:false}
      seata: ${acuity.database.isSeata:false}
      # 设置默认数据源为 0
      primary: "0"
      datasource:
        "0":
          # 锚点引用,会将上方 &db-mysql 的配置,连接到这里
          <<: *db-mysql
      druid:
# 锚点引用,会将上方 &druid-mysql 的配置,连接到这里
        <<: *druid-mysql
        initialSize: 10
        minIdle: 10
        maxActive: 200
        max-wait: 60000
        pool-prepared-statements: true
        max-pool-prepared-statement-per-connection-size: 20
        test-on-borrow: false
        test-on-return: false
        test-while-idle: true
        time-between-eviction-runs-millis: 60000  #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        min-evictable-idle-time-millis: 300000    #配置一个连接在池中最小生存的时间,单位是毫秒
        filters: stat,wall
        wall:
          enabled: true
          strictSyntaxCheck: false
          comment-allow: true
          multiStatementAllow: true
          noneBaseStatementAllow: true
    druid:
      enable: true
      # 以下的2段配置,同时适用于所有模式
      web-stat-filter:  # WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
        enabled: true
        url-pattern: /*
        exclusions: "*.js , *.gif ,*.jpg ,*.png ,*.css ,*.ico , /druid/*"
        session-stat-max-count: 1000
        profile-enable: true
        session-stat-enable: false
      stat-view-servlet:  #展示Druid的统计信息,StatViewServlet的用途包括:1.提供监控信息展示的html页面2.提供监控信息的JSON API
        enabled: true
        url-pattern: /druid/*   #根据配置中的url-pattern来访问内置监控页面,如果是上面的配置,内置监控页面的首页是/druid/index.html例如:http://127.0.0.1:9000/druid/index.html
        reset-enable: true    #允许清空统计数据
        login-username: ''
        login-password: ''
        # 允许访问IP
        allow: ''
  1. 项目启动时,会加载默认数据源dataSourceMap
java
// 代码位于 dynamic-datasource-spring-boot-starter.jar
public abstract class AbstractDataSourceProvider implements DynamicDataSourceProvider {

    @Autowired
    private DefaultDataSourceCreator defaultDataSourceCreator;

    protected Map<String, DataSource> createDataSourceMap(
            Map<String, DataSourceProperty> dataSourcePropertiesMap) {
        Map<String, DataSource> dataSourceMap = new HashMap<>(dataSourcePropertiesMap.size() * 2);
        for (Map.Entry<String, DataSourceProperty> item : dataSourcePropertiesMap.entrySet()) {
            String dsName = item.getKey();
            DataSourceProperty dataSourceProperty = item.getValue();
            String poolName = dataSourceProperty.getPoolName();
            if (poolName == null || "".equals(poolName)) {
                poolName = dsName;
            }
            dataSourceProperty.setPoolName(poolName);
            // 创建 默认数据源
            dataSourceMap.put(dsName, defaultDataSourceCreator.createDataSource(dataSourceProperty));
        }
        return dataSourceMap;
    }
}

public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();    
    public void afterPropertiesSet() throws Exception {

        checkEnv();

// 加载yml中配置的数据源        
        Map<String, DataSource> dataSources = new HashMap<>(16);
        for (DynamicDataSourceProvider provider : providers) {
            dataSources.putAll(provider.loadDataSources());
        }

 // 将数据源 存储到内存dataSourceMap中
        for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
            addDataSource(dsItem.getKey(), dsItem.getValue());
        }
        // 检测默认数据源是否设置
        if (groupDataSources.containsKey(primary)) {
            log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
        } else if (dataSourceMap.containsKey(primary)) {
            log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
        } else {
            log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
        }
    }
}
// 代码位于 dynamic-datasource-spring-boot-starter.jar
public abstract class AbstractDataSourceProvider implements DynamicDataSourceProvider {

    @Autowired
    private DefaultDataSourceCreator defaultDataSourceCreator;

    protected Map<String, DataSource> createDataSourceMap(
            Map<String, DataSourceProperty> dataSourcePropertiesMap) {
        Map<String, DataSource> dataSourceMap = new HashMap<>(dataSourcePropertiesMap.size() * 2);
        for (Map.Entry<String, DataSourceProperty> item : dataSourcePropertiesMap.entrySet()) {
            String dsName = item.getKey();
            DataSourceProperty dataSourceProperty = item.getValue();
            String poolName = dataSourceProperty.getPoolName();
            if (poolName == null || "".equals(poolName)) {
                poolName = dsName;
            }
            dataSourceProperty.setPoolName(poolName);
            // 创建 默认数据源
            dataSourceMap.put(dsName, defaultDataSourceCreator.createDataSource(dataSourceProperty));
        }
        return dataSourceMap;
    }
}

public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();    
    public void afterPropertiesSet() throws Exception {

        checkEnv();

// 加载yml中配置的数据源        
        Map<String, DataSource> dataSources = new HashMap<>(16);
        for (DynamicDataSourceProvider provider : providers) {
            dataSources.putAll(provider.loadDataSources());
        }

 // 将数据源 存储到内存dataSourceMap中
        for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
            addDataSource(dsItem.getKey(), dsItem.getValue());
        }
        // 检测默认数据源是否设置
        if (groupDataSources.containsKey(primary)) {
            log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
        } else if (dataSourceMap.containsKey(primary)) {
            log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
        } else {
            log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
        }
    }
}
  1. 项目启动时,动态加载租户数据源dataSourceMap
java
@AllArgsConstructor
public class InitDatabaseOnStarted implements ApplicationListener<ApplicationStartedEvent> {
    private final DatasourceInitDataSourceService datasourceInitDataSourceService;
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
    // 接收到项目启动成功事件,会触发该方法加载 租户数据源
        datasourceInitDataSourceService.initDataSource();
    }
}

public class DynamicDataSourceServiceImpl implements DataSourceService {
    private boolean addSystem(Long tenantId, boolean isInitSchema, boolean isNotErrorRetry) {
        DataSourceProperty defDataSourceProperty = properties.getDatasource().get(ContextConstants.DEF_TENANT_ID_STR);
        ArgumentAssert.notNull(defDataSourceProperty, "请先配置默认[{}]数据源", ContextConstants.DEF_TENANT_ID_STR);

// databaseProperties.getInitDatabasePrefix() 是在database.yml 中配置的需要加载的数据源前缀

        databaseProperties.getInitDatabasePrefix().forEach(database -> {
            DataSourceProperty newDataSourceProperty = BeanUtil.toBean(defDataSourceProperty, DataSourceProperty.class);
            // 注意这里的 poolName ,一定要和 DsThreadProcessor#doDetermineDatasource 方法返回的 getPoolName(prefix, tenantId); 一致才行!
            // acuity_pro_base_{TanantId}
            newDataSourceProperty.setPoolName(DsThreadProcessor.getPoolName(database, String.valueOf(tenantId)));

            if (DbType.ORACLE.getDb().equals(getDbType().getDb())) {
                newDataSourceProperty.setUsername(newDataSourceProperty.getPoolName());
                newDataSourceProperty.setPassword(newDataSourceProperty.getPoolName());
            } else {
                String oldDatabase = DbPlusUtil.getDataBaseNameByUrl(defDataSourceProperty.getUrl());
                String newDatabase = StrUtil.join(StrUtil.UNDERLINE, database, tenantId);
                newDataSourceProperty.setUrl(StrUtil.replace(defDataSourceProperty.getUrl(), oldDatabase, newDatabase));
            }
            // 其他的省略...
            putDs(newDataSourceProperty);
        });
        return true;
    }

    private Set<String> putDs(DataSourceProperty dsp) {
        try {
            DynamicRoutingDataSource ds = (DynamicRoutingDataSource) this.dataSource;
            DataSource newDataSource = druidDataSourceCreator.createDataSource(dsp);
            ds.addDataSource(dsp.getPoolName(), newDataSource);
            return ds.getDataSources().keySet();
        } catch (ErrorCreateDataSourceException e) {
            log.error("数据源初始化期间出现异常", e);
            throw new BizException("数据源初始化期间出现异常", e);
        }
    }
}
@AllArgsConstructor
public class InitDatabaseOnStarted implements ApplicationListener<ApplicationStartedEvent> {
    private final DatasourceInitDataSourceService datasourceInitDataSourceService;
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
    // 接收到项目启动成功事件,会触发该方法加载 租户数据源
        datasourceInitDataSourceService.initDataSource();
    }
}

public class DynamicDataSourceServiceImpl implements DataSourceService {
    private boolean addSystem(Long tenantId, boolean isInitSchema, boolean isNotErrorRetry) {
        DataSourceProperty defDataSourceProperty = properties.getDatasource().get(ContextConstants.DEF_TENANT_ID_STR);
        ArgumentAssert.notNull(defDataSourceProperty, "请先配置默认[{}]数据源", ContextConstants.DEF_TENANT_ID_STR);

// databaseProperties.getInitDatabasePrefix() 是在database.yml 中配置的需要加载的数据源前缀

        databaseProperties.getInitDatabasePrefix().forEach(database -> {
            DataSourceProperty newDataSourceProperty = BeanUtil.toBean(defDataSourceProperty, DataSourceProperty.class);
            // 注意这里的 poolName ,一定要和 DsThreadProcessor#doDetermineDatasource 方法返回的 getPoolName(prefix, tenantId); 一致才行!
            // acuity_pro_base_{TanantId}
            newDataSourceProperty.setPoolName(DsThreadProcessor.getPoolName(database, String.valueOf(tenantId)));

            if (DbType.ORACLE.getDb().equals(getDbType().getDb())) {
                newDataSourceProperty.setUsername(newDataSourceProperty.getPoolName());
                newDataSourceProperty.setPassword(newDataSourceProperty.getPoolName());
            } else {
                String oldDatabase = DbPlusUtil.getDataBaseNameByUrl(defDataSourceProperty.getUrl());
                String newDatabase = StrUtil.join(StrUtil.UNDERLINE, database, tenantId);
                newDataSourceProperty.setUrl(StrUtil.replace(defDataSourceProperty.getUrl(), oldDatabase, newDatabase));
            }
            // 其他的省略...
            putDs(newDataSourceProperty);
        });
        return true;
    }

    private Set<String> putDs(DataSourceProperty dsp) {
        try {
            DynamicRoutingDataSource ds = (DynamicRoutingDataSource) this.dataSource;
            DataSource newDataSource = druidDataSourceCreator.createDataSource(dsp);
            ds.addDataSource(dsp.getPoolName(), newDataSource);
            return ds.getDataSources().keySet();
        } catch (ErrorCreateDataSourceException e) {
            log.error("数据源初始化期间出现异常", e);
            throw new BizException("数据源初始化期间出现异常", e);
        }
    }
}
  1. 项目启动成功后,观察 dataSourceMap 有一个默认数据源:0,一个租户数据源:acuity_pro_base_1

  2. 用户登录后,后续的所有请求,前端Axios 拦截器都会将 TanentId 封装到请求头

  3. 请求到达后端后,经过spring拦截器HeaderThreadLocalInterceptor,请求头中携带的信息被存储到线程变量。

  4. 在Service层编写@DS注解,@DS 的value值 #thread 表示从线程变量中获取 acuity_pro_base , 具体由 DsThreadProcessor 实现

java
// @DS(DsConstant.BASE_TENANT) = @DS("#thread.acuity_pro_base")
@DS(DsConstant.BASE_TENANT)
public class UserServiceImpl {
    // balabala
}
// @DS(DsConstant.BASE_TENANT) = @DS("#thread.acuity_pro_base")
@DS(DsConstant.BASE_TENANT)
public class UserServiceImpl {
    // balabala
}
  1. 请求经过拦截器
java
public class DynamicDataSourceAnnotationInterceptor implements MethodInterceptor {

    private static final String DYNAMIC_PREFIX = "#";
    private final DataSourceClassResolver dataSourceClassResolver;
    private final DsProcessor dsProcessor;
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 获取 @DS 注解的value
        String dsKey = determineDatasourceKey(invocation);
        // 最终的dsKey 是 DynamicRoutingDataSource#dataSourceMap 中的 key 才能正确选择key
        DynamicDataSourceContextHolder.push(dsKey);
        try {
            return invocation.proceed();
        } finally {
            DynamicDataSourceContextHolder.poll();
        }
    }

    private String determineDatasourceKey(MethodInvocation invocation) {
        // 获取 @DS("#thread.acuity_pro_base") 注解中的 #thread.acuity_pro_base
        String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());
        // 若key是以#开头,则调用 dsProcessor.determineDatasource() 方法解析 真实值
        return key.startsWith(DYNAMIC_PREFIX) ? dsProcessor.determineDatasource(invocation, key) : key;
    }
}
public class DynamicDataSourceAnnotationInterceptor implements MethodInterceptor {

    private static final String DYNAMIC_PREFIX = "#";
    private final DataSourceClassResolver dataSourceClassResolver;
    private final DsProcessor dsProcessor;
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 获取 @DS 注解的value
        String dsKey = determineDatasourceKey(invocation);
        // 最终的dsKey 是 DynamicRoutingDataSource#dataSourceMap 中的 key 才能正确选择key
        DynamicDataSourceContextHolder.push(dsKey);
        try {
            return invocation.proceed();
        } finally {
            DynamicDataSourceContextHolder.poll();
        }
    }

    private String determineDatasourceKey(MethodInvocation invocation) {
        // 获取 @DS("#thread.acuity_pro_base") 注解中的 #thread.acuity_pro_base
        String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());
        // 若key是以#开头,则调用 dsProcessor.determineDatasource() 方法解析 真实值
        return key.startsWith(DYNAMIC_PREFIX) ? dsProcessor.determineDatasource(invocation, key) : key;
    }
}
  1. #thread.acuity_pro_base 具体值 由DsThreadProcessor 解析
java
public class DsThreadProcessor extends DsProcessor {

    /**
     * header prefix
     */
    public static final String HEADER_PREFIX = "#thread";
    public static final String POOL_NAME = "{}_{}";

    public static String getPoolName(String prefix, String tenantId) {
        return StrUtil.format(POOL_NAME, prefix, tenantId);
    }

    @Override
    public boolean matches(String key) {
        return key.startsWith(HEADER_PREFIX);
    }

    @Override
    public String doDetermineDatasource(MethodInvocation invocation, String key) {
// 从 #thread.acuity_pro_base 中截取出 acuity_pro_base 字符串
        String prefix = key.substring(HEADER_PREFIX.length() + 1);
// 在线程变量中获取 acuity_pro_base 对应的具体值
        String tenantId = ContextUtil.get(prefix);
// acuity_pro_base_{TanantId}
        return getPoolName(prefix, tenantId);
    }
}
public class DsThreadProcessor extends DsProcessor {

    /**
     * header prefix
     */
    public static final String HEADER_PREFIX = "#thread";
    public static final String POOL_NAME = "{}_{}";

    public static String getPoolName(String prefix, String tenantId) {
        return StrUtil.format(POOL_NAME, prefix, tenantId);
    }

    @Override
    public boolean matches(String key) {
        return key.startsWith(HEADER_PREFIX);
    }

    @Override
    public String doDetermineDatasource(MethodInvocation invocation, String key) {
// 从 #thread.acuity_pro_base 中截取出 acuity_pro_base 字符串
        String prefix = key.substring(HEADER_PREFIX.length() + 1);
// 在线程变量中获取 acuity_pro_base 对应的具体值
        String tenantId = ContextUtil.get(prefix);
// acuity_pro_base_{TanantId}
        return getPoolName(prefix, tenantId);
    }
}
  1. 从 dataSourceMap 中获取数据源
java
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();    
    @Override
    public DataSource determineDataSource() {
        String dsKey = DynamicDataSourceContextHolder.peek();
        return getDataSource(dsKey);
    }
    public DataSource getDataSource(String ds) {
        // 切库失败? 请认真观察 ds 和 dataSourceMap 中的key

        if (StringUtils.isEmpty(ds)) {
            return determinePrimaryDataSource();
        } else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return groupDataSources.get(ds).determineDataSource();
        } else if (dataSourceMap.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return dataSourceMap.get(ds);
        }
        if (strict) {
            throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
        }
        return determinePrimaryDataSource();
    }
}
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
    private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();    
    @Override
    public DataSource determineDataSource() {
        String dsKey = DynamicDataSourceContextHolder.peek();
        return getDataSource(dsKey);
    }
    public DataSource getDataSource(String ds) {
        // 切库失败? 请认真观察 ds 和 dataSourceMap 中的key

        if (StringUtils.isEmpty(ds)) {
            return determinePrimaryDataSource();
        } else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return groupDataSources.get(ds).determineDataSource();
        } else if (dataSourceMap.containsKey(ds)) {
            log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
            return dataSourceMap.get(ds);
        }
        if (strict) {
            throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
        }
        return determinePrimaryDataSource();
    }
}

总结:

  1. 项目启动时,在 DynamicDataSourceServiceImpl#addSystem 方法断点,确保租户数据源加载成功

  2. 请求执行时,在 DsThreadProcessor#doDetermineDatasource 方法断点,记住从@DS中解析出来的值

    • 上图中,入参 key 取决于Service层 @DS("{key}")

    • ContextUtil.get(prefix) 取决于 HeaderThreadLocalInterceptor 拦截器是否正确赋值

    java
    public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            if (!(handler instanceof HandlerMethod)) {
                return true;
            }
    
           // 当前请求来自那个租户, 相当于 ContextUtil.set("TenantId", xx)     
           ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER));
           String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
           if (StrUtil.isNotEmpty(basePoolName)) {
                // 当前请求需要操作那个租户基础库,相当于 ContextUtil.set("acuity_pro_base", xx)   
                ContextUtil.setTenantBasePoolName(basePoolName);
            }
            String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER);
            if (StrUtil.isNotEmpty(extendPoolName)) {
                // 当前请求需要操作那个租户扩展库,相当于 ContextUtil.set("acuity_extend", xx)   
                 ContextUtil.setTenantExtendPoolName(extendPoolName);
            }
        }
    }
    public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor {
    
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            if (!(handler instanceof HandlerMethod)) {
                return true;
            }
    
           // 当前请求来自那个租户, 相当于 ContextUtil.set("TenantId", xx)     
           ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER));
           String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
           if (StrUtil.isNotEmpty(basePoolName)) {
                // 当前请求需要操作那个租户基础库,相当于 ContextUtil.set("acuity_pro_base", xx)   
                ContextUtil.setTenantBasePoolName(basePoolName);
            }
            String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER);
            if (StrUtil.isNotEmpty(extendPoolName)) {
                // 当前请求需要操作那个租户扩展库,相当于 ContextUtil.set("acuity_extend", xx)   
                 ContextUtil.setTenantExtendPoolName(extendPoolName);
            }
        }
    }
  3. 请求执行时,在 DynamicRoutingDataSource#determineDataSource 方法打断点,确保dataSourceMap集合中有{dsKey}对应的数据源

疑问:

  1. ContextUtil.setTenantId 、 ContextUtil.setTenantBasePoolName、ContextUtil.setTenantExtendPoolName的区别?
  • setTenantId 是设置当前请求属于那个租户

  • setTenantBasePoolName是设置当前请求接下来的db操作,操作那个租户库的基础库

  • setTenantExtendPoolName 是设置当前请求接下来的db操作,操作那个租户库的**扩展库 **

    目前所有的服务均只连接了基础库,尚未使用扩展库,setTenantExtendPoolName方法只是预留给需要一个租户链接多个库的场景使用。

一般情况下,都是租户A的请求操作租户A的数据库,所以setTenantId方法内部同时会调用setTenantBasePoolName和setTenantExtendPoolName方法。

特殊情况下,租户A需要操作租户B的数据库,则只需要调用 ContextUtil.setTenantBasePoolName 方法将租户B的id 传递进去,接下来的请求就会操作租户B。

java
public class UserBiz {

    public void test() {
        // 当前租户ID = 1, acuity_pro_base = 1     
        User user = userService.getById(1);  // 操作 acuity_pro_base_1

        // 操作租户2
        ContextUtil.setTenantBasePoolName(2);
        user.setId(null);    
        userService.insert(user);   // 此时当前租户ID = 1, acuity_pro_base = 2 ,所以操作 acuity_pro_base_2 数据库


        // 还可以继续切换回去操作 当前租户ID
        // 区分 setTenantBasePoolName 和 setTenantId 的目的就是因为这个!
        ContextUtil.setTenantBasePoolName(ContextUtil.getTenantId());
    }


}

// @DS("#thread.acuity_pro_base") 中的 acuity_pro_base 与 ContextUtil.setTenantBasePoolName 一致。
@DS("#thread.acuity_pro_base")
public class UserService {
    public User getById(Long id){ //... }

    public void insert(User user){ //... }
}

public class ContextUtil {
    public static void setTenantBasePoolName(Object tenantId) {
        // @DS("#thread.acuity_pro_base") 与 acuity_pro_base 一致。
        THREAD_LOCAL.put("acuity_pro_base", tenantId);
    }
}
public class UserBiz {

    public void test() {
        // 当前租户ID = 1, acuity_pro_base = 1     
        User user = userService.getById(1);  // 操作 acuity_pro_base_1

        // 操作租户2
        ContextUtil.setTenantBasePoolName(2);
        user.setId(null);    
        userService.insert(user);   // 此时当前租户ID = 1, acuity_pro_base = 2 ,所以操作 acuity_pro_base_2 数据库


        // 还可以继续切换回去操作 当前租户ID
        // 区分 setTenantBasePoolName 和 setTenantId 的目的就是因为这个!
        ContextUtil.setTenantBasePoolName(ContextUtil.getTenantId());
    }


}

// @DS("#thread.acuity_pro_base") 中的 acuity_pro_base 与 ContextUtil.setTenantBasePoolName 一致。
@DS("#thread.acuity_pro_base")
public class UserService {
    public User getById(Long id){ //... }

    public void insert(User user){ //... }
}

public class ContextUtil {
    public static void setTenantBasePoolName(Object tenantId) {
        // @DS("#thread.acuity_pro_base") 与 acuity_pro_base 一致。
        THREAD_LOCAL.put("acuity_pro_base", tenantId);
    }
}

COLUMN模式。

DATASOURCE模式是指每个租户对应一个数据库,应用启动时,会将所有租户的数据源加载,执行数据库操作时,动态切换数据源到当前租户的数据源,并执行SQL。

COLUMN模式是指所有租户共用一个数据库,在表中增加租户ID字段区分。

COLUMN模式 实现原理解析

  1. 项目启动时,加载 TenantLineInnerInterceptor 拦截器
java
// 代码位置: acuity-database-mode
@MapperScan(basePackages = {UTIL_PACKAGE, BUSINESS_PACKAGE}, annotationClass = Repository.class)
public abstract class ColumnMybatisConfiguration extends BaseMybatisConfiguration {

    public ColumnMybatisConfiguration(final DatabaseProperties databaseProperties) {
        super(databaseProperties);
    }

    /**
     * COLUMN 模式 SQL动态拼接拦截器
     *
     * @return 插件
     */
    @Override
    protected List<InnerInterceptor> getPaginationBeforeInnerInterceptor() {
        List<InnerInterceptor> list = new ArrayList<>();
        // COLUMN 模式 多租户插件
        TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
        tli.setTenantLineHandler(new MultiTenantLineHandler() {

            // 业务表中的 租户ID 字段名
            @Override
            public String getTenantIdColumn() {
                return databaseProperties.getTenantIdColumn();
            }

            // 是否忽略在SQL中拼接 租户ID
            @Override
            public boolean ignoreTable(String tableName) {
                if (ContextUtil.isDefTenantId()) {
                    return true;
                }

                boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);

                boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
                        databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
                return ignoreTable || ignoreTablePrefix;
            }

            // 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
            @Override
            public Expression getTenantId() {
                return new LongValue(ContextUtil.getBasePoolNameHeader());
            }
        });
        list.add(tli);
        return list;
    }
}


public final class ContextUtil {
    private static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new TransmittableThreadLocal<>();
    public static Long getBasePoolNameHeader() {
        return THREAD_LOCAL.get(ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
    }
}

public final class ContextConstants {
    public static final String TENANT_BASE_POOL_NAME_HEADER = "acuity_pro_base";
}
// 代码位置: acuity-database-mode
@MapperScan(basePackages = {UTIL_PACKAGE, BUSINESS_PACKAGE}, annotationClass = Repository.class)
public abstract class ColumnMybatisConfiguration extends BaseMybatisConfiguration {

    public ColumnMybatisConfiguration(final DatabaseProperties databaseProperties) {
        super(databaseProperties);
    }

    /**
     * COLUMN 模式 SQL动态拼接拦截器
     *
     * @return 插件
     */
    @Override
    protected List<InnerInterceptor> getPaginationBeforeInnerInterceptor() {
        List<InnerInterceptor> list = new ArrayList<>();
        // COLUMN 模式 多租户插件
        TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
        tli.setTenantLineHandler(new MultiTenantLineHandler() {

            // 业务表中的 租户ID 字段名
            @Override
            public String getTenantIdColumn() {
                return databaseProperties.getTenantIdColumn();
            }

            // 是否忽略在SQL中拼接 租户ID
            @Override
            public boolean ignoreTable(String tableName) {
                if (ContextUtil.isDefTenantId()) {
                    return true;
                }

                boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);

                boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
                        databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
                return ignoreTable || ignoreTablePrefix;
            }

            // 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
            @Override
            public Expression getTenantId() {
                return new LongValue(ContextUtil.getBasePoolNameHeader());
            }
        });
        list.add(tli);
        return list;
    }
}


public final class ContextUtil {
    private static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new TransmittableThreadLocal<>();
    public static Long getBasePoolNameHeader() {
        return THREAD_LOCAL.get(ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
    }
}

public final class ContextConstants {
    public static final String TENANT_BASE_POOL_NAME_HEADER = "acuity_pro_base";
}
  1. 用户登录后,后续的所有请求,前端Axios 拦截器都会将 TanentId 封装到请求头
ts
// 代码位置: acuity-web-pro/src/utils/http/axios/index.ts
/**
   * @description: 请求拦截器处理
   */
  requestInterceptors: (config, options) => {
    const {
      multiTenantType,
      clientId,
      clientSecret,
      tokenKey,
      tenantIdKey,
      applicationIdKey,
      authorizationKey,
    } = globSetting;

// 用户身份Token
    const token = getToken();
    if (token && (config as Recordable)?.requestOptions?.withToken !== false) {
      (config as Recordable).headers[tokenKey] = options.authenticationScheme
        ? `${options.authenticationScheme} ${token}`
        : token;
    }

    // COLUMN模式和DATASOURCE模式的请求会增加租户编码
    if (
      (config as Recordable)?.requestOptions?.withTenant !== false &&
      multiTenantType !== MultiTenantTypeEnum.NONE
    ) {
      (config as Recordable).headers[tenantIdKey] = getTenantId();
    }


// 其他请求头 ...

    return config;
  }
// 代码位置: acuity-web-pro/src/utils/http/axios/index.ts
/**
   * @description: 请求拦截器处理
   */
  requestInterceptors: (config, options) => {
    const {
      multiTenantType,
      clientId,
      clientSecret,
      tokenKey,
      tenantIdKey,
      applicationIdKey,
      authorizationKey,
    } = globSetting;

// 用户身份Token
    const token = getToken();
    if (token && (config as Recordable)?.requestOptions?.withToken !== false) {
      (config as Recordable).headers[tokenKey] = options.authenticationScheme
        ? `${options.authenticationScheme} ${token}`
        : token;
    }

    // COLUMN模式和DATASOURCE模式的请求会增加租户编码
    if (
      (config as Recordable)?.requestOptions?.withTenant !== false &&
      multiTenantType !== MultiTenantTypeEnum.NONE
    ) {
      (config as Recordable).headers[tenantIdKey] = getTenantId();
    }


// 其他请求头 ...

    return config;
  }
  1. 请求到达后端后,经过spring拦截器HeaderThreadLocalInterceptor,请求头中携带的信息被存储到线程变量。

参数1、2、3的区别?

参数1:当前请求的发起人所属的租户ID

参数2、3:当前请求同一线程的方法中,涉及到数据库操作时,需要操作那个数据库

java
// 代码位置 acuity-boot
public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }

        if (!ContextUtil.getBoot()) {
            // 参数1:租户ID
            ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER));
            // 参数2:base库 前缀
            String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
            if (StrUtil.isNotEmpty(basePoolName)) {
                ContextUtil.setTenantBasePoolName(basePoolName);
            }
            // 参数3:extend库 前缀
            String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER);
            if (StrUtil.isNotEmpty(extendPoolName)) {
                ContextUtil.setTenantExtendPoolName(extendPoolName);
            }

          // 其他代码省略
        }

        return true;
    }
}
// 代码位置 acuity-boot
public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }

        if (!ContextUtil.getBoot()) {
            // 参数1:租户ID
            ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER));
            // 参数2:base库 前缀
            String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
            if (StrUtil.isNotEmpty(basePoolName)) {
                ContextUtil.setTenantBasePoolName(basePoolName);
            }
            // 参数3:extend库 前缀
            String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER);
            if (StrUtil.isNotEmpty(extendPoolName)) {
                ContextUtil.setTenantExtendPoolName(extendPoolName);
            }

          // 其他代码省略
        }

        return true;
    }
}
  1. 请求调用到达Mapper层,所有SQL都会经过拦截器:TenantLineInnerInterceptor,该拦截器会拦截insert、update、delete、select 等sql,自动拼接租户ID字段。如:

    • insert into user(id, name) values (?,?) 变成 insert into user(id, name, tenant_id) values (?,?,{ContextUtil.getTenantBasePoolName()})

    • update user set name = ? where id = ? 变成 update user set name = ? where id = ? and tenant_id = {ContextUtil.getTenantBasePoolName()}

    • delete from user where id = ? 变成 delete from user where id = ? and tenant_id = {ContextUtil.getTenantBasePoolName()}

    • select id, name from user 变成 select id ,name from user where tenant_id = {ContextUtil.getTenantBasePoolName()}

自动拼接的 tenant_id = {ContextUtil.getTenantBasePoolName()} 取决于 ColumnMybatisConfiguration 中的配置

java
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {

    // 业务表中的 租户ID 字段名
    @Override
    public String getTenantIdColumn() {
        // tenant_id 
        return databaseProperties.getTenantIdColumn();
    }

    // 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
    @Override
    public Expression getTenantId() {
    //{ContextUtil.getTenantBasePoolName()}
        return new LongValue(ContextUtil.getBasePoolNameHeader());
    }
});
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {

    // 业务表中的 租户ID 字段名
    @Override
    public String getTenantIdColumn() {
        // tenant_id 
        return databaseProperties.getTenantIdColumn();
    }

    // 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
    @Override
    public Expression getTenantId() {
    //{ContextUtil.getTenantBasePoolName()}
        return new LongValue(ContextUtil.getBasePoolNameHeader());
    }
});
  1. 默认情况下,所有Mapper层的方法都会被TenantLineInnerInterceptor处理,拼接上租户ID,某些方法或多个方法不想被拦截器处理,方法如下:
  • 加注解: @InterceptorIgnore 。 类上面全类生效,方法上单个方法生效。
java
@InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
public interface UserMapper {

    @InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
    User get();
}
@InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
public interface UserMapper {

    @InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
    User get();
}
  • 全局配置
java
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {

   // 是否忽略在SQL中拼接 租户ID
   @Override
   public boolean ignoreTable(String tableName) {
        // 方法返回true ,表示当前sql不处理
        if (ContextUtil.isDefTenantId()) {
           return true;
        }

       boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);

       boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
                        databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
       return ignoreTable || ignoreTablePrefix;
    }

});
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {

   // 是否忽略在SQL中拼接 租户ID
   @Override
   public boolean ignoreTable(String tableName) {
        // 方法返回true ,表示当前sql不处理
        if (ContextUtil.isDefTenantId()) {
           return true;
        }

       boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);

       boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
                        databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
       return ignoreTable || ignoreTablePrefix;
    }

});

疑问:

  1. ContextUtil.setTenantId 和 ContextUtil.setTenantBasePoolName的区别?

欢迎使用天源云Saas快速开发系统