在使用sharding-jdbc4.1.1过程中,遇到了死锁的情况,是由:执行引擎在准备阶段多创建一个connection引起的。所以对准备阶段的源码分析了解一下。
官方对准备阶段的描述文档:https://shardingsphere.apache.org/document/4.1.1/cn/features/sharding/principle/execute/
准备阶段顾名思义,此阶段用于准备执行的数据。它分为结果集分组和执行单元创建两个步骤。
结果集分组是实现内化连接模式概念的关键。执行引擎根据maxConnectionSizePerQuery配置项,结合当前路由结果,选择恰当的连接模式。 具体步骤如下:
- 将SQL的路由结果按照数据源的名称进行分组。通过下图的公式,可以获得每个数据库实例在maxConnectionSizePerQuery的允许范围内,每个连接需要执行的SQL路由结果组,并计算出本次请求的最优连接模式。
在maxConnectionSizePerQuery允许的范围内,当一个连接需要执行的请求数量大于1时,意味着当前的数据库连接无法持有相应的数据结果集,则必须采用内存归并; 反之,当一个连接需要执行的请求数量等于1时,意味着当前的数据库连接可以持有相应的数据结果集,则可以采用流式归并。
每一次的连接模式的选择,是针对每一个物理数据库的。也就是说,在同一次查询中,如果路由至一个以上的数据库,每个数据库的连接模式不一定一样,它们可能是混合存在的形态。
通过上一步骤获得的路由分组结果创建执行的单元。 当数据源使用数据库连接池等控制数据库连接数量的技术时,在获取数据库连接时,如果不妥善处理并发,则有一定几率发生死锁。 在多个请求相互等待对方释放数据库连接资源时,将会产生饥饿等待,造成交叉的死锁问题。
举例说明,假设一次查询需要在某一数据源上获取两个数据库连接,并路由至同一个数据库的两个分表查询。 则有可能出现查询A已获取到该数据源的1个数据库连接,并等待获取另一个数据库连接;而查询B也已经在该数据源上获取到的一个数据库连接,并同样等待另一个数据库连接的获取。 如果数据库连接池的允许最大连接数是2,那么这2个查询请求将永久的等待下去。下图描绘了死锁的情况。
ShardingSphere为了避免死锁的出现,在获取数据库连接时进行了同步处理。 它在创建执行单元时,以原子性的方式一次性获取本次SQL请求所需的全部数据库连接,杜绝了每次查询请求获取到部分资源的可能。 由于对数据库的 *** 作非常频繁,每次获取数据库连接时时都进行锁定,会降低ShardingSphere的并发。因此,ShardingSphere在这里进行了2点优化:
避免锁定一次性只需要获取1个数据库连接的 *** 作。因为每次仅需要获取1个连接,则不会发生两个请求相互等待的场景,无需锁定。 对于大部分OLTP的 *** 作,都是使用分片键路由至唯一的数据节点,这会使得系统变为完全无锁的状态,进一步提升了并发效率。 除了路由至单分片的情况,读写分离也在此范畴之内。
仅针对内存限制模式时才进行资源锁定。在使用连接限制模式时,所有的查询结果集将在装载至内存之后释放掉数据库连接资源,因此不会产生死锁等待的问题。
(这里的死锁场景 跟 我上一篇遇到的场景 是不同的:https://blog.csdn.net/qq_37402304/article/details/122743074)
源码: 准备阶段的调用栈:public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter { ............. @Override public boolean execute() throws SQLException { try { clearPrevious(); prepare(); initPreparedStatementExecutor(); return preparedStatementExecutor.execute(); } finally { clearBatch(); } } private void initPreparedStatementExecutor() throws SQLException { preparedStatementExecutor.init(executionContext); setParametersForStatements(); replayMethodForStatements(); } ................. }
public final class PreparedStatementExecutor extends AbstractStatementExecutor { .............................. public void init(final ExecutionContext executionContext) throws SQLException { setSqlStatementContext(executionContext.getSqlStatementContext()); getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits())); cacheStatements(); } private Collection> obtainExecuteGroups(final Collection executionUnits) throws SQLException { //getExecuteUnitGroups 开始准备阶段 return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() { @Override public List getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize); } @Override public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException { return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode); } }); } .................... }
@RequiredArgsConstructor public final class SQLExecutePrepareTemplate { private final int maxConnectionsSizePerQuery; public Collection> getExecuteUnitGroups(final Collection executionUnits, final SQLExecutePrepareCallback callback) throws SQLException { return getSynchronizedExecuteUnitGroups(executionUnits, callback); } private Collection > getSynchronizedExecuteUnitGroups( final Collection executionUnits, final SQLExecutePrepareCallback callback) throws SQLException { Map > sqlUnitGroups = getSQLUnitGroups(executionUnits); Collection > result = new linkedList<>(); for (Entry > entry : sqlUnitGroups.entrySet()) { result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback)); } return result; } private Map > getSQLUnitGroups(final Collection executionUnits) { Map > result = new linkedHashMap<>(executionUnits.size(), 1); for (ExecutionUnit each : executionUnits) { if (!result.containsKey(each.getDataSourceName())) { result.put(each.getDataSourceName(), new linkedList<>()); } result.get(each.getDataSourceName()).add(each.getSqlUnit()); } return result; } private List > getSQLExecuteGroups(final String dataSourceName, final List sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException { List > result = new linkedList<>(); // desiredPartitionSize:一共maxConnectionsSizePerQuery个分区,希望每个分区里的sql数量 // 看sql的数量 是否 是maxConnectionsSizePerQuery的倍数,保证maxConnectionsSizePerQuery数量内,对sqlUnits均分 // yes:每个分区的元素数量都一样; no:按desiredPartitionSize数量、依次的分隔 int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1); // 把sqlUnits分隔,按desiredPartitionSize数量、依次的分隔 // 主要用途:用maxConnectionsSizePerQuery个connection来 并发执行 分区里的sql List > sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize); // 连接模式。 ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; // 需要sqlUnitPartitions.size()个connection,实际 sqlUnitPartitions.size() <= maxConnectionsSizePerQuery List
connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); int count = 0; for (List each : sqlUnitPartitions) { result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback)); } return result; } private InputGroup getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection, final String dataSourceName, final List sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException { List result = new linkedList<>(); for (SQLUnit each : sqlUnitGroup) { result.add(callback.createStatementExecuteUnit(connection, new ExecutionUnit(dataSourceName, each), connectionMode)); } return new InputGroup<>(result); } }
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection { public final Connection getConnection(final String dataSourceName) throws SQLException { return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0); } public final ListgetConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { DataSource dataSource = getDataSourceMap().get(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName); Collection connections; // 先从缓存里获取 synchronized (cachedConnections) { connections = cachedConnections.get(dataSourceName); } List result; // 缓存里足够,直接返回 if (connections.size() >= connectionSize) { result = new ArrayList<>(connections).subList(0, connectionSize); } else if (!connections.isEmpty()) { // 如果连接数小于目标数量,且缓存里非空 -> 那么新建(目标数-已有数)个连接 result = new ArrayList<>(connectionSize); result.addAll(connections); List newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size()); result.addAll(newConnections); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, newConnections); } } else { // 如果连接数小于目标数量,且缓存是空的 -> 那么新建目标数个连接 result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize)); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, result); } } return result; } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private List createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException { if (1 == connectionSize) { Connection connection = createConnection(dataSourceName, dataSource); replayMethodsInvocation(connection); return Collections.singletonList(connection); } // 连接限制模式下, if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) { return createConnections(dataSourceName, dataSource, connectionSize); } // 内存限制模式下,控制并发 // 对内存限制模式,ShardingSphere为了避免死锁的出现,在获取数据库连接时进行了同步处理。 synchronized (dataSource) { return createConnections(dataSourceName, dataSource, connectionSize); } } private List createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException { List result = new ArrayList<>(connectionSize); for (int i = 0; i < connectionSize; i++) { try { Connection connection = createConnection(dataSourceName, dataSource); replayMethodsInvocation(connection); result.add(connection); } catch (final SQLException ex) { for (Connection each : result) { each.close(); } throw new SQLException(String.format("Could't get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex); } } return result; } protected abstract Connection createConnection(String dataSourceName, DataSource dataSource) throws SQLException; .......................... }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)