Appearance
企业切换
DATASOURCE模式
DATASOURCE模式是指每个租户对应一个数据库,应用启动时,会将所有租户的数据源加载,执行数据库操作时,动态切换数据源到当前租户的数据源,并执行SQL。
COLUMN模式是指所有租户共用一个数据库,在表中增加租户ID字段区分。
DATASOURCE 模式 实现原理解析
- 配置默认数据源
yaml
# database.yml
acuity:
druid-mysql: &druid-mysql
validation-query: SELECT 'x'
mysql: &db-mysql
username: 'root'
password: 'root'
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/acuity_ds_c_defaults?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useUnicode=true&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&nullCatalogMeansCurrent=true
database:
# COLUMN模式中隔离 租户 的列名 oracle数据库initDatabasePrefix不能超过11个字符
initDatabasePrefix:
- acuity_pro_base
spring:
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
datasource:
dynamic:
enabled: true
p6spy: ${acuity.database.p6spy:false}
seata: ${acuity.database.isSeata:false}
# 设置默认数据源为 0
primary: "0"
datasource:
"0":
# 锚点引用,会将上方 &db-mysql 的配置,连接到这里
<<: *db-mysql
druid:
# 锚点引用,会将上方 &druid-mysql 的配置,连接到这里
<<: *druid-mysql
initialSize: 10
minIdle: 10
maxActive: 200
max-wait: 60000
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
test-on-borrow: false
test-on-return: false
test-while-idle: true
time-between-eviction-runs-millis: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
min-evictable-idle-time-millis: 300000 #配置一个连接在池中最小生存的时间,单位是毫秒
filters: stat,wall
wall:
enabled: true
strictSyntaxCheck: false
comment-allow: true
multiStatementAllow: true
noneBaseStatementAllow: true
druid:
enable: true
# 以下的2段配置,同时适用于所有模式
web-stat-filter: # WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
enabled: true
url-pattern: /*
exclusions: "*.js , *.gif ,*.jpg ,*.png ,*.css ,*.ico , /druid/*"
session-stat-max-count: 1000
profile-enable: true
session-stat-enable: false
stat-view-servlet: #展示Druid的统计信息,StatViewServlet的用途包括:1.提供监控信息展示的html页面2.提供监控信息的JSON API
enabled: true
url-pattern: /druid/* #根据配置中的url-pattern来访问内置监控页面,如果是上面的配置,内置监控页面的首页是/druid/index.html例如:http://127.0.0.1:9000/druid/index.html
reset-enable: true #允许清空统计数据
login-username: ''
login-password: ''
# 允许访问IP
allow: ''
# database.yml
acuity:
druid-mysql: &druid-mysql
validation-query: SELECT 'x'
mysql: &db-mysql
username: 'root'
password: 'root'
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/acuity_ds_c_defaults?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useUnicode=true&useSSL=false&autoReconnect=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&nullCatalogMeansCurrent=true
database:
# COLUMN模式中隔离 租户 的列名 oracle数据库initDatabasePrefix不能超过11个字符
initDatabasePrefix:
- acuity_pro_base
spring:
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
datasource:
dynamic:
enabled: true
p6spy: ${acuity.database.p6spy:false}
seata: ${acuity.database.isSeata:false}
# 设置默认数据源为 0
primary: "0"
datasource:
"0":
# 锚点引用,会将上方 &db-mysql 的配置,连接到这里
<<: *db-mysql
druid:
# 锚点引用,会将上方 &druid-mysql 的配置,连接到这里
<<: *druid-mysql
initialSize: 10
minIdle: 10
maxActive: 200
max-wait: 60000
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
test-on-borrow: false
test-on-return: false
test-while-idle: true
time-between-eviction-runs-millis: 60000 #配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
min-evictable-idle-time-millis: 300000 #配置一个连接在池中最小生存的时间,单位是毫秒
filters: stat,wall
wall:
enabled: true
strictSyntaxCheck: false
comment-allow: true
multiStatementAllow: true
noneBaseStatementAllow: true
druid:
enable: true
# 以下的2段配置,同时适用于所有模式
web-stat-filter: # WebStatFilter配置,说明请参考Druid Wiki,配置_配置WebStatFilter
enabled: true
url-pattern: /*
exclusions: "*.js , *.gif ,*.jpg ,*.png ,*.css ,*.ico , /druid/*"
session-stat-max-count: 1000
profile-enable: true
session-stat-enable: false
stat-view-servlet: #展示Druid的统计信息,StatViewServlet的用途包括:1.提供监控信息展示的html页面2.提供监控信息的JSON API
enabled: true
url-pattern: /druid/* #根据配置中的url-pattern来访问内置监控页面,如果是上面的配置,内置监控页面的首页是/druid/index.html例如:http://127.0.0.1:9000/druid/index.html
reset-enable: true #允许清空统计数据
login-username: ''
login-password: ''
# 允许访问IP
allow: ''
- 项目启动时,会加载默认数据源到 dataSourceMap 中
java
// 代码位于 dynamic-datasource-spring-boot-starter.jar
public abstract class AbstractDataSourceProvider implements DynamicDataSourceProvider {
@Autowired
private DefaultDataSourceCreator defaultDataSourceCreator;
protected Map<String, DataSource> createDataSourceMap(
Map<String, DataSourceProperty> dataSourcePropertiesMap) {
Map<String, DataSource> dataSourceMap = new HashMap<>(dataSourcePropertiesMap.size() * 2);
for (Map.Entry<String, DataSourceProperty> item : dataSourcePropertiesMap.entrySet()) {
String dsName = item.getKey();
DataSourceProperty dataSourceProperty = item.getValue();
String poolName = dataSourceProperty.getPoolName();
if (poolName == null || "".equals(poolName)) {
poolName = dsName;
}
dataSourceProperty.setPoolName(poolName);
// 创建 默认数据源
dataSourceMap.put(dsName, defaultDataSourceCreator.createDataSource(dataSourceProperty));
}
return dataSourceMap;
}
}
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
public void afterPropertiesSet() throws Exception {
checkEnv();
// 加载yml中配置的数据源
Map<String, DataSource> dataSources = new HashMap<>(16);
for (DynamicDataSourceProvider provider : providers) {
dataSources.putAll(provider.loadDataSources());
}
// 将数据源 存储到内存dataSourceMap中
for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
addDataSource(dsItem.getKey(), dsItem.getValue());
}
// 检测默认数据源是否设置
if (groupDataSources.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
} else if (dataSourceMap.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
} else {
log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
}
}
}
// 代码位于 dynamic-datasource-spring-boot-starter.jar
public abstract class AbstractDataSourceProvider implements DynamicDataSourceProvider {
@Autowired
private DefaultDataSourceCreator defaultDataSourceCreator;
protected Map<String, DataSource> createDataSourceMap(
Map<String, DataSourceProperty> dataSourcePropertiesMap) {
Map<String, DataSource> dataSourceMap = new HashMap<>(dataSourcePropertiesMap.size() * 2);
for (Map.Entry<String, DataSourceProperty> item : dataSourcePropertiesMap.entrySet()) {
String dsName = item.getKey();
DataSourceProperty dataSourceProperty = item.getValue();
String poolName = dataSourceProperty.getPoolName();
if (poolName == null || "".equals(poolName)) {
poolName = dsName;
}
dataSourceProperty.setPoolName(poolName);
// 创建 默认数据源
dataSourceMap.put(dsName, defaultDataSourceCreator.createDataSource(dataSourceProperty));
}
return dataSourceMap;
}
}
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
public void afterPropertiesSet() throws Exception {
checkEnv();
// 加载yml中配置的数据源
Map<String, DataSource> dataSources = new HashMap<>(16);
for (DynamicDataSourceProvider provider : providers) {
dataSources.putAll(provider.loadDataSources());
}
// 将数据源 存储到内存dataSourceMap中
for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
addDataSource(dsItem.getKey(), dsItem.getValue());
}
// 检测默认数据源是否设置
if (groupDataSources.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
} else if (dataSourceMap.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
} else {
log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
}
}
}
- 项目启动时,动态加载租户数据源 到 dataSourceMap
java
@AllArgsConstructor
public class InitDatabaseOnStarted implements ApplicationListener<ApplicationStartedEvent> {
private final DatasourceInitDataSourceService datasourceInitDataSourceService;
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
// 接收到项目启动成功事件,会触发该方法加载 租户数据源
datasourceInitDataSourceService.initDataSource();
}
}
public class DynamicDataSourceServiceImpl implements DataSourceService {
private boolean addSystem(Long tenantId, boolean isInitSchema, boolean isNotErrorRetry) {
DataSourceProperty defDataSourceProperty = properties.getDatasource().get(ContextConstants.DEF_TENANT_ID_STR);
ArgumentAssert.notNull(defDataSourceProperty, "请先配置默认[{}]数据源", ContextConstants.DEF_TENANT_ID_STR);
// databaseProperties.getInitDatabasePrefix() 是在database.yml 中配置的需要加载的数据源前缀
databaseProperties.getInitDatabasePrefix().forEach(database -> {
DataSourceProperty newDataSourceProperty = BeanUtil.toBean(defDataSourceProperty, DataSourceProperty.class);
// 注意这里的 poolName ,一定要和 DsThreadProcessor#doDetermineDatasource 方法返回的 getPoolName(prefix, tenantId); 一致才行!
// acuity_pro_base_{TanantId}
newDataSourceProperty.setPoolName(DsThreadProcessor.getPoolName(database, String.valueOf(tenantId)));
if (DbType.ORACLE.getDb().equals(getDbType().getDb())) {
newDataSourceProperty.setUsername(newDataSourceProperty.getPoolName());
newDataSourceProperty.setPassword(newDataSourceProperty.getPoolName());
} else {
String oldDatabase = DbPlusUtil.getDataBaseNameByUrl(defDataSourceProperty.getUrl());
String newDatabase = StrUtil.join(StrUtil.UNDERLINE, database, tenantId);
newDataSourceProperty.setUrl(StrUtil.replace(defDataSourceProperty.getUrl(), oldDatabase, newDatabase));
}
// 其他的省略...
putDs(newDataSourceProperty);
});
return true;
}
private Set<String> putDs(DataSourceProperty dsp) {
try {
DynamicRoutingDataSource ds = (DynamicRoutingDataSource) this.dataSource;
DataSource newDataSource = druidDataSourceCreator.createDataSource(dsp);
ds.addDataSource(dsp.getPoolName(), newDataSource);
return ds.getDataSources().keySet();
} catch (ErrorCreateDataSourceException e) {
log.error("数据源初始化期间出现异常", e);
throw new BizException("数据源初始化期间出现异常", e);
}
}
}
@AllArgsConstructor
public class InitDatabaseOnStarted implements ApplicationListener<ApplicationStartedEvent> {
private final DatasourceInitDataSourceService datasourceInitDataSourceService;
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
// 接收到项目启动成功事件,会触发该方法加载 租户数据源
datasourceInitDataSourceService.initDataSource();
}
}
public class DynamicDataSourceServiceImpl implements DataSourceService {
private boolean addSystem(Long tenantId, boolean isInitSchema, boolean isNotErrorRetry) {
DataSourceProperty defDataSourceProperty = properties.getDatasource().get(ContextConstants.DEF_TENANT_ID_STR);
ArgumentAssert.notNull(defDataSourceProperty, "请先配置默认[{}]数据源", ContextConstants.DEF_TENANT_ID_STR);
// databaseProperties.getInitDatabasePrefix() 是在database.yml 中配置的需要加载的数据源前缀
databaseProperties.getInitDatabasePrefix().forEach(database -> {
DataSourceProperty newDataSourceProperty = BeanUtil.toBean(defDataSourceProperty, DataSourceProperty.class);
// 注意这里的 poolName ,一定要和 DsThreadProcessor#doDetermineDatasource 方法返回的 getPoolName(prefix, tenantId); 一致才行!
// acuity_pro_base_{TanantId}
newDataSourceProperty.setPoolName(DsThreadProcessor.getPoolName(database, String.valueOf(tenantId)));
if (DbType.ORACLE.getDb().equals(getDbType().getDb())) {
newDataSourceProperty.setUsername(newDataSourceProperty.getPoolName());
newDataSourceProperty.setPassword(newDataSourceProperty.getPoolName());
} else {
String oldDatabase = DbPlusUtil.getDataBaseNameByUrl(defDataSourceProperty.getUrl());
String newDatabase = StrUtil.join(StrUtil.UNDERLINE, database, tenantId);
newDataSourceProperty.setUrl(StrUtil.replace(defDataSourceProperty.getUrl(), oldDatabase, newDatabase));
}
// 其他的省略...
putDs(newDataSourceProperty);
});
return true;
}
private Set<String> putDs(DataSourceProperty dsp) {
try {
DynamicRoutingDataSource ds = (DynamicRoutingDataSource) this.dataSource;
DataSource newDataSource = druidDataSourceCreator.createDataSource(dsp);
ds.addDataSource(dsp.getPoolName(), newDataSource);
return ds.getDataSources().keySet();
} catch (ErrorCreateDataSourceException e) {
log.error("数据源初始化期间出现异常", e);
throw new BizException("数据源初始化期间出现异常", e);
}
}
}
项目启动成功后,观察 dataSourceMap 有一个默认数据源:0,一个租户数据源:acuity_pro_base_1
用户登录后,后续的所有请求,前端Axios 拦截器都会将 TanentId 封装到请求头
请求到达后端后,经过spring拦截器
HeaderThreadLocalInterceptor
,请求头中携带的信息被存储到线程变量。在Service层编写@DS注解,@DS 的value值
#thread
表示从线程变量中获取acuity_pro_base
, 具体由 DsThreadProcessor 实现
java
// @DS(DsConstant.BASE_TENANT) = @DS("#thread.acuity_pro_base")
@DS(DsConstant.BASE_TENANT)
public class UserServiceImpl {
// balabala
}
// @DS(DsConstant.BASE_TENANT) = @DS("#thread.acuity_pro_base")
@DS(DsConstant.BASE_TENANT)
public class UserServiceImpl {
// balabala
}
- 请求经过拦截器
java
public class DynamicDataSourceAnnotationInterceptor implements MethodInterceptor {
private static final String DYNAMIC_PREFIX = "#";
private final DataSourceClassResolver dataSourceClassResolver;
private final DsProcessor dsProcessor;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取 @DS 注解的value
String dsKey = determineDatasourceKey(invocation);
// 最终的dsKey 是 DynamicRoutingDataSource#dataSourceMap 中的 key 才能正确选择key
DynamicDataSourceContextHolder.push(dsKey);
try {
return invocation.proceed();
} finally {
DynamicDataSourceContextHolder.poll();
}
}
private String determineDatasourceKey(MethodInvocation invocation) {
// 获取 @DS("#thread.acuity_pro_base") 注解中的 #thread.acuity_pro_base
String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());
// 若key是以#开头,则调用 dsProcessor.determineDatasource() 方法解析 真实值
return key.startsWith(DYNAMIC_PREFIX) ? dsProcessor.determineDatasource(invocation, key) : key;
}
}
public class DynamicDataSourceAnnotationInterceptor implements MethodInterceptor {
private static final String DYNAMIC_PREFIX = "#";
private final DataSourceClassResolver dataSourceClassResolver;
private final DsProcessor dsProcessor;
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取 @DS 注解的value
String dsKey = determineDatasourceKey(invocation);
// 最终的dsKey 是 DynamicRoutingDataSource#dataSourceMap 中的 key 才能正确选择key
DynamicDataSourceContextHolder.push(dsKey);
try {
return invocation.proceed();
} finally {
DynamicDataSourceContextHolder.poll();
}
}
private String determineDatasourceKey(MethodInvocation invocation) {
// 获取 @DS("#thread.acuity_pro_base") 注解中的 #thread.acuity_pro_base
String key = dataSourceClassResolver.findKey(invocation.getMethod(), invocation.getThis());
// 若key是以#开头,则调用 dsProcessor.determineDatasource() 方法解析 真实值
return key.startsWith(DYNAMIC_PREFIX) ? dsProcessor.determineDatasource(invocation, key) : key;
}
}
- #thread.acuity_pro_base 具体值 由DsThreadProcessor 解析
java
public class DsThreadProcessor extends DsProcessor {
/**
* header prefix
*/
public static final String HEADER_PREFIX = "#thread";
public static final String POOL_NAME = "{}_{}";
public static String getPoolName(String prefix, String tenantId) {
return StrUtil.format(POOL_NAME, prefix, tenantId);
}
@Override
public boolean matches(String key) {
return key.startsWith(HEADER_PREFIX);
}
@Override
public String doDetermineDatasource(MethodInvocation invocation, String key) {
// 从 #thread.acuity_pro_base 中截取出 acuity_pro_base 字符串
String prefix = key.substring(HEADER_PREFIX.length() + 1);
// 在线程变量中获取 acuity_pro_base 对应的具体值
String tenantId = ContextUtil.get(prefix);
// acuity_pro_base_{TanantId}
return getPoolName(prefix, tenantId);
}
}
public class DsThreadProcessor extends DsProcessor {
/**
* header prefix
*/
public static final String HEADER_PREFIX = "#thread";
public static final String POOL_NAME = "{}_{}";
public static String getPoolName(String prefix, String tenantId) {
return StrUtil.format(POOL_NAME, prefix, tenantId);
}
@Override
public boolean matches(String key) {
return key.startsWith(HEADER_PREFIX);
}
@Override
public String doDetermineDatasource(MethodInvocation invocation, String key) {
// 从 #thread.acuity_pro_base 中截取出 acuity_pro_base 字符串
String prefix = key.substring(HEADER_PREFIX.length() + 1);
// 在线程变量中获取 acuity_pro_base 对应的具体值
String tenantId = ContextUtil.get(prefix);
// acuity_pro_base_{TanantId}
return getPoolName(prefix, tenantId);
}
}
- 从 dataSourceMap 中获取数据源
java
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
@Override
public DataSource determineDataSource() {
String dsKey = DynamicDataSourceContextHolder.peek();
return getDataSource(dsKey);
}
public DataSource getDataSource(String ds) {
// 切库失败? 请认真观察 ds 和 dataSourceMap 中的key
if (StringUtils.isEmpty(ds)) {
return determinePrimaryDataSource();
} else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
return groupDataSources.get(ds).determineDataSource();
} else if (dataSourceMap.containsKey(ds)) {
log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
return dataSourceMap.get(ds);
}
if (strict) {
throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
}
return determinePrimaryDataSource();
}
}
public class DynamicRoutingDataSource extends AbstractRoutingDataSource implements InitializingBean, DisposableBean {
private final Map<String, DataSource> dataSourceMap = new ConcurrentHashMap<>();
@Override
public DataSource determineDataSource() {
String dsKey = DynamicDataSourceContextHolder.peek();
return getDataSource(dsKey);
}
public DataSource getDataSource(String ds) {
// 切库失败? 请认真观察 ds 和 dataSourceMap 中的key
if (StringUtils.isEmpty(ds)) {
return determinePrimaryDataSource();
} else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
return groupDataSources.get(ds).determineDataSource();
} else if (dataSourceMap.containsKey(ds)) {
log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
return dataSourceMap.get(ds);
}
if (strict) {
throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
}
return determinePrimaryDataSource();
}
}
总结:
项目启动时,在 DynamicDataSourceServiceImpl#addSystem 方法断点,确保租户数据源加载成功
请求执行时,在 DsThreadProcessor#doDetermineDatasource 方法断点,记住从@DS中解析出来的值
上图中,入参 key 取决于Service层 @DS("{key}")
ContextUtil.get(prefix)
取决于 HeaderThreadLocalInterceptor 拦截器是否正确赋值
javapublic class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (!(handler instanceof HandlerMethod)) { return true; } // 当前请求来自那个租户, 相当于 ContextUtil.set("TenantId", xx) ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER)); String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER); if (StrUtil.isNotEmpty(basePoolName)) { // 当前请求需要操作那个租户基础库,相当于 ContextUtil.set("acuity_pro_base", xx) ContextUtil.setTenantBasePoolName(basePoolName); } String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER); if (StrUtil.isNotEmpty(extendPoolName)) { // 当前请求需要操作那个租户扩展库,相当于 ContextUtil.set("acuity_extend", xx) ContextUtil.setTenantExtendPoolName(extendPoolName); } } }
public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (!(handler instanceof HandlerMethod)) { return true; } // 当前请求来自那个租户, 相当于 ContextUtil.set("TenantId", xx) ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER)); String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER); if (StrUtil.isNotEmpty(basePoolName)) { // 当前请求需要操作那个租户基础库,相当于 ContextUtil.set("acuity_pro_base", xx) ContextUtil.setTenantBasePoolName(basePoolName); } String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER); if (StrUtil.isNotEmpty(extendPoolName)) { // 当前请求需要操作那个租户扩展库,相当于 ContextUtil.set("acuity_extend", xx) ContextUtil.setTenantExtendPoolName(extendPoolName); } } }
请求执行时,在 DynamicRoutingDataSource#determineDataSource 方法打断点,确保dataSourceMap集合中有{dsKey}对应的数据源
疑问:
- ContextUtil.setTenantId 、 ContextUtil.setTenantBasePoolName、ContextUtil.setTenantExtendPoolName的区别?
setTenantId 是设置当前请求属于那个租户
setTenantBasePoolName是设置当前请求接下来的db操作,操作那个租户库的基础库
setTenantExtendPoolName 是设置当前请求接下来的db操作,操作那个租户库的**扩展库 **
目前所有的服务均只连接了基础库,尚未使用扩展库,setTenantExtendPoolName方法只是预留给需要一个租户链接多个库的场景使用。
一般情况下,都是租户A的请求操作租户A的数据库,所以setTenantId方法内部同时会调用setTenantBasePoolName和setTenantExtendPoolName方法。
特殊情况下,租户A需要操作租户B的数据库,则只需要调用 ContextUtil.setTenantBasePoolName 方法将租户B的id 传递进去,接下来的请求就会操作租户B。
java
public class UserBiz {
public void test() {
// 当前租户ID = 1, acuity_pro_base = 1
User user = userService.getById(1); // 操作 acuity_pro_base_1
// 操作租户2
ContextUtil.setTenantBasePoolName(2);
user.setId(null);
userService.insert(user); // 此时当前租户ID = 1, acuity_pro_base = 2 ,所以操作 acuity_pro_base_2 数据库
// 还可以继续切换回去操作 当前租户ID
// 区分 setTenantBasePoolName 和 setTenantId 的目的就是因为这个!
ContextUtil.setTenantBasePoolName(ContextUtil.getTenantId());
}
}
// @DS("#thread.acuity_pro_base") 中的 acuity_pro_base 与 ContextUtil.setTenantBasePoolName 一致。
@DS("#thread.acuity_pro_base")
public class UserService {
public User getById(Long id){ //... }
public void insert(User user){ //... }
}
public class ContextUtil {
public static void setTenantBasePoolName(Object tenantId) {
// @DS("#thread.acuity_pro_base") 与 acuity_pro_base 一致。
THREAD_LOCAL.put("acuity_pro_base", tenantId);
}
}
public class UserBiz {
public void test() {
// 当前租户ID = 1, acuity_pro_base = 1
User user = userService.getById(1); // 操作 acuity_pro_base_1
// 操作租户2
ContextUtil.setTenantBasePoolName(2);
user.setId(null);
userService.insert(user); // 此时当前租户ID = 1, acuity_pro_base = 2 ,所以操作 acuity_pro_base_2 数据库
// 还可以继续切换回去操作 当前租户ID
// 区分 setTenantBasePoolName 和 setTenantId 的目的就是因为这个!
ContextUtil.setTenantBasePoolName(ContextUtil.getTenantId());
}
}
// @DS("#thread.acuity_pro_base") 中的 acuity_pro_base 与 ContextUtil.setTenantBasePoolName 一致。
@DS("#thread.acuity_pro_base")
public class UserService {
public User getById(Long id){ //... }
public void insert(User user){ //... }
}
public class ContextUtil {
public static void setTenantBasePoolName(Object tenantId) {
// @DS("#thread.acuity_pro_base") 与 acuity_pro_base 一致。
THREAD_LOCAL.put("acuity_pro_base", tenantId);
}
}
COLUMN模式。
DATASOURCE模式是指每个租户对应一个数据库,应用启动时,会将所有租户的数据源加载,执行数据库操作时,动态切换数据源到当前租户的数据源,并执行SQL。
COLUMN模式是指所有租户共用一个数据库,在表中增加租户ID字段区分。
COLUMN模式 实现原理解析
- 项目启动时,加载 TenantLineInnerInterceptor 拦截器
java
// 代码位置: acuity-database-mode
@MapperScan(basePackages = {UTIL_PACKAGE, BUSINESS_PACKAGE}, annotationClass = Repository.class)
public abstract class ColumnMybatisConfiguration extends BaseMybatisConfiguration {
public ColumnMybatisConfiguration(final DatabaseProperties databaseProperties) {
super(databaseProperties);
}
/**
* COLUMN 模式 SQL动态拼接拦截器
*
* @return 插件
*/
@Override
protected List<InnerInterceptor> getPaginationBeforeInnerInterceptor() {
List<InnerInterceptor> list = new ArrayList<>();
// COLUMN 模式 多租户插件
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {
// 业务表中的 租户ID 字段名
@Override
public String getTenantIdColumn() {
return databaseProperties.getTenantIdColumn();
}
// 是否忽略在SQL中拼接 租户ID
@Override
public boolean ignoreTable(String tableName) {
if (ContextUtil.isDefTenantId()) {
return true;
}
boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);
boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
return ignoreTable || ignoreTablePrefix;
}
// 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
@Override
public Expression getTenantId() {
return new LongValue(ContextUtil.getBasePoolNameHeader());
}
});
list.add(tli);
return list;
}
}
public final class ContextUtil {
private static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new TransmittableThreadLocal<>();
public static Long getBasePoolNameHeader() {
return THREAD_LOCAL.get(ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
}
}
public final class ContextConstants {
public static final String TENANT_BASE_POOL_NAME_HEADER = "acuity_pro_base";
}
// 代码位置: acuity-database-mode
@MapperScan(basePackages = {UTIL_PACKAGE, BUSINESS_PACKAGE}, annotationClass = Repository.class)
public abstract class ColumnMybatisConfiguration extends BaseMybatisConfiguration {
public ColumnMybatisConfiguration(final DatabaseProperties databaseProperties) {
super(databaseProperties);
}
/**
* COLUMN 模式 SQL动态拼接拦截器
*
* @return 插件
*/
@Override
protected List<InnerInterceptor> getPaginationBeforeInnerInterceptor() {
List<InnerInterceptor> list = new ArrayList<>();
// COLUMN 模式 多租户插件
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {
// 业务表中的 租户ID 字段名
@Override
public String getTenantIdColumn() {
return databaseProperties.getTenantIdColumn();
}
// 是否忽略在SQL中拼接 租户ID
@Override
public boolean ignoreTable(String tableName) {
if (ContextUtil.isDefTenantId()) {
return true;
}
boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);
boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
return ignoreTable || ignoreTablePrefix;
}
// 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
@Override
public Expression getTenantId() {
return new LongValue(ContextUtil.getBasePoolNameHeader());
}
});
list.add(tli);
return list;
}
}
public final class ContextUtil {
private static final ThreadLocal<Map<String, String>> THREAD_LOCAL = new TransmittableThreadLocal<>();
public static Long getBasePoolNameHeader() {
return THREAD_LOCAL.get(ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
}
}
public final class ContextConstants {
public static final String TENANT_BASE_POOL_NAME_HEADER = "acuity_pro_base";
}
- 用户登录后,后续的所有请求,前端Axios 拦截器都会将 TanentId 封装到请求头
ts
// 代码位置: acuity-web-pro/src/utils/http/axios/index.ts
/**
* @description: 请求拦截器处理
*/
requestInterceptors: (config, options) => {
const {
multiTenantType,
clientId,
clientSecret,
tokenKey,
tenantIdKey,
applicationIdKey,
authorizationKey,
} = globSetting;
// 用户身份Token
const token = getToken();
if (token && (config as Recordable)?.requestOptions?.withToken !== false) {
(config as Recordable).headers[tokenKey] = options.authenticationScheme
? `${options.authenticationScheme} ${token}`
: token;
}
// COLUMN模式和DATASOURCE模式的请求会增加租户编码
if (
(config as Recordable)?.requestOptions?.withTenant !== false &&
multiTenantType !== MultiTenantTypeEnum.NONE
) {
(config as Recordable).headers[tenantIdKey] = getTenantId();
}
// 其他请求头 ...
return config;
}
// 代码位置: acuity-web-pro/src/utils/http/axios/index.ts
/**
* @description: 请求拦截器处理
*/
requestInterceptors: (config, options) => {
const {
multiTenantType,
clientId,
clientSecret,
tokenKey,
tenantIdKey,
applicationIdKey,
authorizationKey,
} = globSetting;
// 用户身份Token
const token = getToken();
if (token && (config as Recordable)?.requestOptions?.withToken !== false) {
(config as Recordable).headers[tokenKey] = options.authenticationScheme
? `${options.authenticationScheme} ${token}`
: token;
}
// COLUMN模式和DATASOURCE模式的请求会增加租户编码
if (
(config as Recordable)?.requestOptions?.withTenant !== false &&
multiTenantType !== MultiTenantTypeEnum.NONE
) {
(config as Recordable).headers[tenantIdKey] = getTenantId();
}
// 其他请求头 ...
return config;
}
- 请求到达后端后,经过spring拦截器
HeaderThreadLocalInterceptor
,请求头中携带的信息被存储到线程变量。
参数1、2、3的区别?
参数1:当前请求的发起人所属的租户ID
参数2、3:当前请求同一线程的方法中,涉及到数据库操作时,需要操作那个数据库
java
// 代码位置 acuity-boot
public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) {
return true;
}
if (!ContextUtil.getBoot()) {
// 参数1:租户ID
ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER));
// 参数2:base库 前缀
String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
if (StrUtil.isNotEmpty(basePoolName)) {
ContextUtil.setTenantBasePoolName(basePoolName);
}
// 参数3:extend库 前缀
String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER);
if (StrUtil.isNotEmpty(extendPoolName)) {
ContextUtil.setTenantExtendPoolName(extendPoolName);
}
// 其他代码省略
}
return true;
}
}
// 代码位置 acuity-boot
public class HeaderThreadLocalInterceptor implements AsyncHandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) {
return true;
}
if (!ContextUtil.getBoot()) {
// 参数1:租户ID
ContextUtil.setTenantId(getHeader(request, ContextConstants.TENANT_ID_HEADER));
// 参数2:base库 前缀
String basePoolName = getHeader(request, ContextConstants.TENANT_BASE_POOL_NAME_HEADER);
if (StrUtil.isNotEmpty(basePoolName)) {
ContextUtil.setTenantBasePoolName(basePoolName);
}
// 参数3:extend库 前缀
String extendPoolName = getHeader(request, ContextConstants.TENANT_EXTEND_POOL_NAME_HEADER);
if (StrUtil.isNotEmpty(extendPoolName)) {
ContextUtil.setTenantExtendPoolName(extendPoolName);
}
// 其他代码省略
}
return true;
}
}
请求调用到达Mapper层,所有SQL都会经过拦截器:TenantLineInnerInterceptor,该拦截器会拦截insert、update、delete、select 等sql,自动拼接租户ID字段。如:
insert into user(id, name) values (?,?)
变成insert into user(id, name, tenant_id) values (?,?,{ContextUtil.getTenantBasePoolName()})
update user set name = ? where id = ?
变成update user set name = ? where id = ? and tenant_id = {ContextUtil.getTenantBasePoolName()}
delete from user where id = ?
变成delete from user where id = ? and tenant_id = {ContextUtil.getTenantBasePoolName()}
select id, name from user
变成select id ,name from user where tenant_id = {ContextUtil.getTenantBasePoolName()}
自动拼接的 tenant_id = {ContextUtil.getTenantBasePoolName()} 取决于 ColumnMybatisConfiguration 中的配置
java
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {
// 业务表中的 租户ID 字段名
@Override
public String getTenantIdColumn() {
// tenant_id
return databaseProperties.getTenantIdColumn();
}
// 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
@Override
public Expression getTenantId() {
//{ContextUtil.getTenantBasePoolName()}
return new LongValue(ContextUtil.getBasePoolNameHeader());
}
});
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {
// 业务表中的 租户ID 字段名
@Override
public String getTenantIdColumn() {
// tenant_id
return databaseProperties.getTenantIdColumn();
}
// 租户ID取值: 从 ContextUtil.getBasePoolNameHeader() 中取
@Override
public Expression getTenantId() {
//{ContextUtil.getTenantBasePoolName()}
return new LongValue(ContextUtil.getBasePoolNameHeader());
}
});
- 默认情况下,所有Mapper层的方法都会被TenantLineInnerInterceptor处理,拼接上租户ID,某些方法或多个方法不想被拦截器处理,方法如下:
- 加注解: @InterceptorIgnore 。 类上面全类生效,方法上单个方法生效。
java
@InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
public interface UserMapper {
@InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
User get();
}
@InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
public interface UserMapper {
@InterceptorIgnore(tenantLine = "true", dynamicTableName = "true")
User get();
}
- 全局配置
java
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {
// 是否忽略在SQL中拼接 租户ID
@Override
public boolean ignoreTable(String tableName) {
// 方法返回true ,表示当前sql不处理
if (ContextUtil.isDefTenantId()) {
return true;
}
boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);
boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
return ignoreTable || ignoreTablePrefix;
}
});
TenantLineInnerInterceptor tli = new TenantLineInnerInterceptor();
tli.setTenantLineHandler(new MultiTenantLineHandler() {
// 是否忽略在SQL中拼接 租户ID
@Override
public boolean ignoreTable(String tableName) {
// 方法返回true ,表示当前sql不处理
if (ContextUtil.isDefTenantId()) {
return true;
}
boolean ignoreTable = databaseProperties.getIgnoreTable() != null && databaseProperties.getIgnoreTable().contains(tableName);
boolean ignoreTablePrefix = databaseProperties.getIgnoreTablePrefix() != null &&
databaseProperties.getIgnoreTablePrefix().stream().anyMatch(prefix -> tableName.startsWith(prefix));
return ignoreTable || ignoreTablePrefix;
}
});
疑问:
- ContextUtil.setTenantId 和 ContextUtil.setTenantBasePoolName的区别?