DataSourceConfiguration
配置类,springBoot默认采用HikariDataSource
/**
* Hikari DataSource configuration.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HikariDataSource.class)
@ConditionalOnMissingBean(DataSource.class)
@ConditionalOnProperty(name = "spring.datasource.type", havingValue = "com.zaxxer.hikari.HikariDataSource",
matchIfMissing = true)
static class Hikari {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
HikariDataSource dataSource(DataSourceProperties properties) {
HikariDataSource dataSource = createDataSource(properties, HikariDataSource.class);
if (StringUtils.hasText(properties.getName())) {
dataSource.setPoolName(properties.getName());
}
return dataSource;
}
}
@Configuration(proxyBeanMethods = false)得含义是
@Configuration注解的意思是proxyBeanMethods配置类是用来指定@Bean注解标注的方法是否使用代理,默认是true使用代理,直接从IOC容器之中取得对象;如果设置为false,也就是不使用注解,每次调用@Bean标注的方法获取到的对象和IOC容器中的都不一样,是一个新的对象,所以我们可以将此属性设置为false来提高性能;
**@ConditionalOnProperty(name = “spring.datasource.type”, havingValue = “com.zaxxer.hikari.HikariDataSource”,
** matchIfMissing = true)**
如果配置spring.datasource.type为com.zaxxer.hikari.HikariDataSource才加载,如果配置为空,则默认加载此数据源
HikariDataSource
一个一个看这些接口
AutoCloseable
代表一个对象在close之前可能持有某些资源(文件或socket)。如果对象是在try-with-resources代码块中声明的,
AutoCloseable对象的close()方法会被自动执行。这种构造方式保证了最快的资源释放,避免资源耗尽异常。void close() throws Exception
关闭资源,放弃所有内在的资源。如果对象是在try-with-resources代码块中声明的,那么这个方法会自动被执行。
虽然这个接口的方法声明成抛出Exception异常,但是强烈推荐实现类抛出更详细的异常,或者不要要出异常。
需要注意实现类的close操作可能会失败。 强烈推荐优先放弃内部资源和内部标记资源已经关闭,尽量不去抛出异常。close方法不会执行
多次,所以要确保资源能够及时释放。此外,优先关闭内部资源可以降低资源封装带来的问题,假如一个资源包含了另一个资源,要依次关闭。
实现类也强烈建议不要抛出InterruptedException异常。这个异常和thread的 interrupt标志相关,而且线程运行时可能会出现一些迷惑行为。
例如实现类
import lombok.Data;
@Data
public class AutoCloseAbleTest implements AutoCloseable {
private boolean closed = false;
@Override
public void close() throws IllegalArgumentException {
closed = true;
throw new IllegalArgumentException("测试抛出异常");
}
public void doSomething(){
System.out.println("现在资源是开着的");
}
}
测试类
public class AutoCloseAbleExample {
/**
* 使用try-with-resources模式声明资源
* @param args
*/
public static void main(String[] args){
try(AutoCloseAbleTest test = new AutoCloseAbleTest()){
test.doSomething();
}catch (IllegalArgumentException e){
e.printStackTrace();
}
}
}
执行结果
java.lang.IllegalArgumentException: 测试抛出异常
at org.zhuzhenxi.learning.lang.autocloseable.AutoCloseAbleTest.close(AutoCloseAbleTest.java:14)
at org.zhuzhenxi.learning.lang.autocloseable.AutoCloseAbleExample.main(AutoCloseAbleExample.java:7)
HikariConfig
// Properties changeable at runtime through the HikariConfigMXBean
//
private volatile String catalog;
private volatile long connectionTimeout;
private volatile long validationTimeout;
private volatile long idleTimeout;
private volatile long leakDetectionThreshold;
private volatile long maxLifetime;
private volatile int maxPoolSize;
private volatile int minIdle;
private volatile String username;
private volatile String password;
// Properties NOT changeable at runtime
//
private long initializationFailTimeout;
private String connectionInitSql;
private String connectionTestQuery;
private String dataSourceClassName;
private String dataSourceJndiName;
private String driverClassName;
private String exceptionOverrideClassName;
private String jdbcUrl;
private String poolName;
private String schema;
private String transactionIsolationName;
private boolean isAutoCommit;
private boolean isReadOnly;
private boolean isIsolateInternalQueries;
private boolean isRegisterMbeans;
private boolean isAllowPoolSuspension;
private DataSource dataSource;
private Properties dataSourceProperties;
private ThreadFactory threadFactory;
private ScheduledExecutorService scheduledExecutor;
private MetricsTrackerFactory metricsTrackerFactory;
private Object metricRegistry;
private Object healthCheckRegistry;
private Properties healthCheckProperties;
配置类,有很多配置项
name 描述 构造器默认值 默认配置validate之后的值 validate重置
autoCommit 自动提交从池中返回的连接 true true -
connectionTimeout 等待来自池的连接的最大毫秒数 SECONDS.toMillis(30) = 30000 30000 如果小于250毫秒,则被重置回30秒
idleTimeout 连接允许在池中闲置的最长时间 MINUTES.toMillis(10) = 600000 600000 如果idleTimeout+1秒>maxLifetime 且 maxLifetime>0,则会被重置为0(代表永远不会退出);如果idleTimeout!=0且小于10秒,则会被重置为10秒
maxLifetime 池中连接最长生命周期 MINUTES.toMillis(30) = 1800000 1800000 如果不等于0且小于30秒则会被重置回30分钟
connectionTestQuery 如果您的驱动程序支持JDBC4,我们强烈建议您不要设置此属性 null null -
minimumIdle 池中维护的最小空闲连接数 -1 10 minIdle<0或者minIdle>maxPoolSize,则被重置为maxPoolSize
maximumPoolSize 池中最大连接数,包括闲置和使用中的连接 -1 10 如果maxPoolSize小于1,则会被重置。当minIdle<=0被重置为DEFAULT_POOL_SIZE则为10;如果minIdle>0则重置为minIdle的值
metricRegistry 该属性允许您指定一个 Codahale / Dropwizard MetricRegistry 的实例,供池使用以记录各种指标 null null -
healthCheckRegistry 该属性允许您指定池使用的Codahale / Dropwizard HealthCheckRegistry的实例来报告当前健康信息 null null -
poolName 连接池的用户定义名称,主要出现在日志记录和JMX管理控制台中以识别池和池配置 null HikariPool-1 -
initializationFailTimeout 如果池无法成功初始化连接,则此属性控制池是否将 fail fast 1 1 -
isolateInternalQueries 是否在其自己的事务中隔离内部池查询,例如连接活动测试 false false -
allowPoolSuspension 控制池是否可以通过JMX暂停和恢复 false false -
readOnly 从池中获取的连接是否默认处于只读模式 false false -
registerMbeans 是否注册JMX管理Bean(MBeans) false false -
catalog 为支持 catalog 概念的数据库设置默认 catalog driver default null -
connectionInitSql 该属性设置一个SQL语句,在将每个新连接创建后,将其添加到池中之前执行该语句。 null null -
driverClassName HikariCP将尝试通过仅基于jdbcUrl的DriverManager解析驱动程序,但对于一些较旧的驱动程序,还必须指定driverClassName null null -
transactionIsolation 控制从池返回的连接的默认事务隔离级别 null null -
validationTimeout 连接将被测试活动的最大时间量 SECONDS.toMillis(5) = 5000 5000 如果小于250毫秒,则会被重置回5秒
leakDetectionThreshold 记录消息之前连接可能离开池的时间量,表示可能的连接泄漏 0 0 如果大于0且不是单元测试,则进一步判断:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),会被重置为0 . 即如果要生效则必须>0,而且不能小于2秒,而且当maxLifetime > 0时不能大于maxLifetime
dataSource 这个属性允许你直接设置数据源的实例被池包装,而不是让HikariCP通过反射来构造它 null null -
schema 该属性为支持模式概念的数据库设置默认模式 driver default null -
threadFactory 此属性允许您设置将用于创建池使用的所有线程的java.util.concurrent.ThreadFactory的实例。 null null -
scheduledExecutor 此属性允许您设置将用于各种内部计划任务的java.util.concurrent.ScheduledExecutorService实例 null null -
DataSource
两个获取数据库连接的接口
public interface DataSource extends CommonDataSource, Wrapper {
/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
*
* @return a connection to the data source
* @exception SQLException if a database access error occurs
* @throws java.sql.SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
*/
Connection getConnection() throws SQLException;
/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
*
* @param username the database user on whose behalf the connection is
* being made
* @param password the user's password
* @return a connection to the data source
* @exception SQLException if a database access error occurs
* @throws java.sql.SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
* @since 1.4
*/
Connection getConnection(String username, String password)
throws SQLException;
}
HikariPool
hikaripool就是数据库连接池的核心了
PoolBase
构造进行初始化
PoolBase(final HikariConfig config)
{
this.config = config;
this.networkTimeout = UNINITIALIZED;
this.catalog = config.getCatalog();
this.schema = config.getSchema();
this.isReadOnly = config.isReadOnly();
this.isAutoCommit = config.isAutoCommit();
this.exceptionOverride = UtilityElf.createInstance(config.getExceptionOverrideClassName(), SQLExceptionOverride.class);
this.transactionIsolation = UtilityElf.getTransactionIsolation(config.getTransactionIsolation());
this.isQueryTimeoutSupported = UNINITIALIZED;
this.isNetworkTimeoutSupported = UNINITIALIZED;
this.isUseJdbc4Validation = config.getConnectionTestQuery() == null;
this.isIsolateInternalQueries = config.isIsolateInternalQueries();
this.poolName = config.getPoolName();
this.connectionTimeout = config.getConnectionTimeout();
this.validationTimeout = config.getValidationTimeout();
this.lastConnectionFailure = new AtomicReference<>();
initializeDataSource();
}
private void initializeDataSource()
{
final String jdbcUrl = config.getJdbcUrl();
final String username = config.getUsername();
final String password = config.getPassword();
final String dsClassName = config.getDataSourceClassName();
final String driverClassName = config.getDriverClassName();
final String dataSourceJNDI = config.getDataSourceJNDI();
final Properties dataSourceProperties = config.getDataSourceProperties();
DataSource ds = config.getDataSource();
if (dsClassName != null && ds == null) {
ds = createInstance(dsClassName, DataSource.class);
PropertyElf.setTargetFromProperties(ds, dataSourceProperties);
}
else if (jdbcUrl != null && ds == null) {
ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password);
}
else if (dataSourceJNDI != null && ds == null) {
try {
InitialContext ic = new InitialContext();
ds = (DataSource) ic.lookup(dataSourceJNDI);
} catch (NamingException e) {
throw new PoolInitializationException(e);
}
}
if (ds != null) {
setLoginTimeout(ds);
createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl);
}
this.dataSource = ds;
}
hikariPoll构造初始化
public HikariPool(final HikariConfig config)
{
super(config);
//数据库连接背包,相当于线程池的任务队列
this.connectionBag = new ConcurrentBag<>(this);
//是否控制获取连接数
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
//创建一个线程池
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
checkFailFast();
if (config.getMetricsTrackerFactory() != null) {
setMetricsTrackerFactory(config.getMetricsTrackerFactory());
}
else {
setMetricRegistry(config.getMetricRegistry());
}
setHealthCheckRegistry(config.getHealthCheckRegistry());
handleMBeans(this, true);
ThreadFactory threadFactory = config.getThreadFactory();
final int maxPoolSize = config.getMaximumPoolSize();
LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);
this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);
if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
final long startTime = currentTime();
while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {
quietlySleep(MILLISECONDS.toMillis(100));
}
addConnectionExecutor.setCorePoolSize(1);
addConnectionExecutor.setMaximumPoolSize(1);
}
}
com.zaxxer.hikari.pool.HikariPool#checkFailFast
private void checkFailFast()
{
final long initializationTimeout = config.getInitializationFailTimeout();
if (initializationTimeout < 0) {
return;
}
final long startTime = currentTime();
do {
final PoolEntry poolEntry = createPoolEntry();
if (poolEntry != null) {
if (config.getMinimumIdle() > 0) {
connectionBag.add(poolEntry);
logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
}
else {
quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
}
return;
}
if (getLastConnectionFailure() instanceof ConnectionSetupException) {
throwPoolInitializationException(getLastConnectionFailure().getCause());
}
quietlySleep(SECONDS.toMillis(1));
} while (elapsedMillis(startTime) < initializationTimeout);
if (initializationTimeout > 0) {
throwPoolInitializationException(getLastConnectionFailure());
}
}
前面说过initializationFailTimeout是否要检查进行failfast快速失败(启动就检查连接报错,而不是等真正需要数据库操作时才发现错误),
PoolEntry
用来封装一个数据库连接
Connection connection;
long lastAccessed;
long lastBorrowed;
@SuppressWarnings("FieldCanBeLocal")
private volatile int state = 0;
private volatile boolean evict;
private volatile ScheduledFuture<?> endOfLife;
private final FastList<Statement> openStatements;
private final HikariPool hikariPool;
private final boolean isReadOnly;
private final boolean isAutoCommit;
先创建一个连接
com.zaxxer.hikari.pool.PoolBase#newConnection
private Connection newConnection() throws Exception
{
final long start = currentTime();
Connection connection = null;
try {
String username = config.getUsername();
String password = config.getPassword();
connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);
if (connection == null) {
throw new SQLTransientConnectionException("DataSource returned null unexpectedly");
}
setupConnection(connection);
lastConnectionFailure.set(null);
return connection;
}
catch (Exception e) {
if (connection != null) {
quietlyCloseConnection(connection, "(Failed to create/setup connection)");
}
else if (getLastConnectionFailure() == null) {
logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage());
}
lastConnectionFailure.set(e);
throw e;
}
finally {
// tracker will be null during failFast check
if (metricsTracker != null) {
metricsTracker.recordConnectionCreated(elapsedMillis(start));
}
}
}
com.zaxxer.hikari.pool.PoolBase#setupConnection
根据配置设置连接,默认设置自动提交,非只读事务
private void setupConnection(final Connection connection) throws ConnectionSetupException
{
try {
if (networkTimeout == UNINITIALIZED) {
networkTimeout = getAndSetNetworkTimeout(connection, validationTimeout);
}
else {
setNetworkTimeout(connection, validationTimeout);
}
if (connection.isReadOnly() != isReadOnly) {
connection.setReadOnly(isReadOnly);
}
if (connection.getAutoCommit() != isAutoCommit) {
connection.setAutoCommit(isAutoCommit);
}
checkDriverSupport(connection);
if (transactionIsolation != defaultTransactionIsolation) {
connection.setTransactionIsolation(transactionIsolation);
}
if (catalog != null) {
connection.setCatalog(catalog);
}
if (schema != null) {
connection.setSchema(schema);
}
executeSql(connection, config.getConnectionInitSql(), true);
setNetworkTimeout(connection, networkTimeout);
}
catch (SQLException e) {
throw new ConnectionSetupException(e);
}
}
如果上层逻辑不希望自动提交,可以重新设置是否自动提交。例如spring事务,会在获取到数据库连接后再设置不自动提交。
通过jdbc建立一个数据库连接
PoolEntry newPoolEntry() throws Exception
{
return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
}
存储了除了数据库连接coneection还有 是不是只读事务,是否自动提交等信息
创建poolEntry后
if (poolEntry != null) {
if (config.getMinimumIdle() > 0) {
connectionBag.add(poolEntry);
logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
}
else {
quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
}
return;
}
如果配置的最小空闲时为0,放入connectionBag,否则立马关闭或这个连接
放入bag
com.zaxxer.hikari.util.ConcurrentBag#add
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
放到sharedList中CopyOnWriteArrayList时一个线程安全的集合
private final CopyOnWriteArrayList<T> sharedList;
连接池获取一个连接
com.zaxxer.hikari.HikariDataSource#getConnection()
开启事务,或者数据库查询都会获取连接
public Connection getConnection() throws SQLException
{
if (isClosed()) {
throw new SQLException("HikariDataSource " + this + " has been closed.");
}
if (fastPathPool != null) {
return fastPathPool.getConnection();
}
// See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
HikariPool result = pool;
if (result == null) {
synchronized (this) {
result = pool;
if (result == null) {
validate();
LOGGER.info("{} - Starting...", getPoolName());
try {
pool = result = new HikariPool(this);
this.seal();
}
catch (PoolInitializationException pie) {
if (pie.getCause() instanceof SQLException) {
throw (SQLException) pie.getCause();
}
else {
throw pie;
}
}
LOGGER.info("{} - Start completed.", getPoolName());
}
}
}
return result.getConnection();
}
进入
com.zaxxer.hikari.pool.HikariPool#getConnection(long)
public Connection getConnection(final long hardTimeout) throws SQLException
{
suspendResumeLock.acquire();
final long startTime = currentTime();
try {
long timeout = hardTimeout;
do {
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}
final long now = currentTime();
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
metricsTracker.recordBorrowStats(poolEntry, startTime);
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
}
} while (timeout > 0L);
metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
suspendResumeLock
数据库连接池暂停恢复锁。如果hikari配置中设置isAllowPoolSuspension为true,那么就会在HikariPool中实例一个暂停恢复锁。
初始化
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;
如果配置了isAllowPoolSuspension那么会有Semaphore进行控制,如果没配置,那么使用的FAUX_LOCK是一个空的实现,也就是没有并发控制
配置了使用一个公平锁控制并发量
其实就是在限制连接的数量,如果连接超过10K,那么就会阻塞直到有别的连接断开后释放了信号量。这是控制IO流量的一种常见的方式。
private static final int MAX_PERMITS = 10000;
private SuspendResumeLock(final boolean createSemaphore)
{
acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
}
public class SuspendResumeLock
{
public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
@Override
public void acquire() {}
@Override
public void release() {}
@Override
public void suspend() {}
@Override
public void resume() {}
};
private static final int MAX_PERMITS = 10000;
private final Semaphore acquisitionSemaphore;
/**
* Default constructor
*/
public SuspendResumeLock()
{
this(true);
}
private SuspendResumeLock(final boolean createSemaphore)
{
acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
}
public void acquire() throws SQLException
{
if (acquisitionSemaphore.tryAcquire()) {
return;
}
else if (Boolean.getBoolean("com.zaxxer.hikari.throwIfSuspended")) {
throw new SQLTransientException("The pool is currently suspended and configured to throw exceptions upon acquisition");
}
acquisitionSemaphore.acquireUninterruptibly();
}
public void release()
{
acquisitionSemaphore.release();
}
public void suspend()
{
acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
}
public void resume()
{
acquisitionSemaphore.release(MAX_PERMITS);
}
}
继续获取连接
PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
进入ConcurrentBag
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
尝试threadList中获取数据库连接对象
如果没用获取到,添加等待数量
尝试从sharedList中获取,获取到通过cas将未使用的状态设置成使用的状态
这里有个listener
IBagStateListener
public interface IBagStateListener
{
void addBagItem(int waiting);
}
实现了这个接口,向bag中加入连接
PoolEntryCreator
创建新得连接
private final class PoolEntryCreator implements Callable<Boolean>
{
private final String loggingPrefix;
PoolEntryCreator(String loggingPrefix)
{
this.loggingPrefix = loggingPrefix;
}
@Override
public Boolean call()
{
long sleepBackoff = 250L;
while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
final PoolEntry poolEntry = createPoolEntry();
if (poolEntry != null) {
connectionBag.add(poolEntry);
logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
if (loggingPrefix != null) {
logPoolState(loggingPrefix);
}
return Boolean.TRUE;
}
// failed to get connection from db, sleep and retry
if (loggingPrefix != null) logger.debug("{} - Connection add failed, sleeping with backoff: {}ms", poolName, sleepBackoff);
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5)));
}
// Pool is suspended or shutdown or at max size
return Boolean.FALSE;
}
private synchronized boolean shouldCreateAnotherConnection() {
return getTotalConnections() < config.getMaximumPoolSize() &&
(connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
}
判断是否应该添加连接 不能大于最大配置的连接数,并且有线程在等待获取连接,并且空闲得连接数小于最小得空闲数
创建新的连接
如果sharedList中也没有获取到空闲连接
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
当前线程需要等待阻塞等到空闲连接
ProxyLeakTask
class ProxyLeakTask implements Runnable
{
private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);
static final ProxyLeakTask NO_LEAK;
private ScheduledFuture<?> scheduledFuture;
private String connectionName;
private Exception exception;
private String threadName;
private boolean isLeaked;
static
{
NO_LEAK = new ProxyLeakTask() {
@Override
void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}
@Override
public void run() {}
@Override
public void cancel() {}
};
}
ProxyLeakTask(final PoolEntry poolEntry)
{
this.exception = new Exception("Apparent connection leak detected");
this.threadName = Thread.currentThread().getName();
this.connectionName = poolEntry.connection.toString();
}
private ProxyLeakTask()
{
}
void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold)
{
scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@Override
public void run()
{
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
void cancel()
{
scheduledFuture.cancel(false);
if (isLeaked) {
LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);
}
}
}
leakDetectionThreshold 设定了连接泄露阀值10s,即当从连接池里取出连接超过10s不归还时,会warn Apparent connection leak detected,用来警告数据库连接的泄露风险。
在连接归还或被踢除时,会清除此调度任务,详见ProxyConnection。即在10s内归还连接,会清除调度任务。超过10s调度任务执行。
清除调度任务时,如果已经超过调度时间,会LOGGER.info(“Previously reported leaked connection {} was returned to the pool (unleaked)”, connectionName);。详见ProxyLeakTask#cancel。
这个case中最大的亮点是。Hikari warn日志中详细的打印出,存在连接泄露的代码行数,异常信息非常直白易懂。实现逻辑详见 ProxyLeakTask#ProxyLeakTask ProxyLeakTask#run。优秀的代码设计+不错的性能,也难怪SpringBoot2.0起默认数据库连接池从TomcatPool换到了「HikariCP」。
具体看如何实现如何精准打印堆栈的
ProxyLeakTask(final PoolEntry poolEntry)
{
this.exception = new Exception("Apparent connection leak detected");
this.threadName = Thread.currentThread().getName();
this.connectionName = poolEntry.connection.toString();
}
在构造ProxyLeakTask创建了exception,异常中
@Override
public void run()
{
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
可以看出来,减了5层的堆栈,刚好就是查询处的堆栈
handoffQueue
SynchronousQueue<T> handoffQueue;
我们知道SynchronousQueue是一个比较特殊的队列 投放任务的生产者在没用和消费者匹配之前,生产者线程会被阻塞。生产者和消费者配一对一通信。
这里,getConection充当一个消费者的角色,去尝试取出数据
至于怎么放入数据,我们后面继续看。
返回一个来连接后,对原生数据库连接做扩展,
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
Connection createProxyConnection(final ProxyLeakTask leakTask, final long now)
{
return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
}
打开ProxyFactory.getProxyConnection这个方法看起来很有意思
static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit)
{
// Body is replaced (injected) by JavassistProxyFactory
throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
}
发现他是一个空的实现,直接抛出异常并且注释Body is replaced (injected) by JavassistProxyFactory
而且它也没用子类实现,那么如何实现创建对应的conneection对象的呢。
当我们打开这个类就明白了
case "getProxyConnection":
method.setBody("{return new " + packageName + ".HikariProxyConnection($$);}");
break;
利用 javaasist直接修改ProxyFactory的字节码,
这样获取的连接被ProxyConnection包裹扩展
ProxyConnection
public abstract class ProxyConnection implements Connection
{
static final int DIRTY_BIT_READONLY = 0b000001;
static final int DIRTY_BIT_AUTOCOMMIT = 0b000010;
static final int DIRTY_BIT_ISOLATION = 0b000100;
static final int DIRTY_BIT_CATALOG = 0b001000;
static final int DIRTY_BIT_NETTIMEOUT = 0b010000;
static final int DIRTY_BIT_SCHEMA = 0b100000;
private static final Logger LOGGER;
private static final Set<String> ERROR_STATES;
private static final Set<Integer> ERROR_CODES;
@SuppressWarnings("WeakerAccess")
protected Connection delegate;
private final PoolEntry poolEntry;
private final ProxyLeakTask leakTask;
private final FastList<Statement> openStatements;
private int dirtyBits;
private long lastAccess;
private boolean isCommitStateDirty;
private boolean isReadOnly;
private boolean isAutoCommit;
private int networkTimeout;
private int transactionIsolation;
private String dbcatalog;
private String dbschema;
}
其内部也维护了连接的一些信息。
在使用spring事务的情况下获取一个连接后
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
会根据事务注解设置事务自动提交,隔离级别,是否为只读连接信息。
这样spring创建了他的事务
事务提交方法
@Override
public void commit() throws SQLException
{
//实际的数据库连接
delegate.commit();
//设置状态
isCommitStateDirty = false;
lastAccess = currentTime();
}
关闭连接方法
@Override
public final void close() throws SQLException
{
// Closing statements can cause connection eviction, so this must run before the conditional below
closeStatements();
if (delegate != ClosedConnection.CLOSED_CONNECTION) {
leakTask.cancel();
try {
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
lastAccess = currentTime();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}
if (dirtyBits != 0) {
poolEntry.resetConnectionState(this, dirtyBits);
lastAccess = currentTime();
}
delegate.clearWarnings();
}
catch (SQLException e) {
// when connections are aborted, exceptions are often thrown that should not reach the application
if (!poolEntry.isMarkedEvicted()) {
throw checkException(e);
}
}
finally {
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);
}
}
}
首先关闭此连接创建的Statement
com.zaxxer.hikari.pool.ProxyConnection#closeStatements
private synchronized void closeStatements()
{
final int size = openStatements.size();
if (size > 0) {
for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
try (Statement ignored = openStatements.get(i)) {
// automatic resource cleanup
}
catch (SQLException e) {
LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
poolEntry.getPoolName(), delegate);
leakTask.cancel();
poolEntry.evict("(exception closing Statements during Connection.close())");
delegate = ClosedConnection.CLOSED_CONNECTION;
}
}
openStatements.clear();
}
}
这里她利用了try-resources-catch来释放资源
leakTask是一个打印异常日志的定时任务
@Override
public void run()
{
isLeaked = true;
final StackTraceElement[] stackTrace = exception.getStackTrace();
final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
System.arraycopy(stackTrace, 5, trace, 0, trace.length);
exception.setStackTrace(trace);
LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
}
如果关闭stateMent失败,那么直接关闭这个数据连接
poolEntry.evict("(exception closing Statements during Connection.close())");
事务执行完毕后需要恢复这个连接原本的状态
finally {
cleanupAfterCompletion(status);
}
org.springframework.jdbc.datasource.DataSourceTransactionManager#doCleanupAfterCompletion
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(
con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
把autoConmit和隔离级别恢复成连接之前的状态。
回头看close方法
isCommitStateDirty代表事务未提交时有更新操作
如果有更新操作,并且在事务中, 需要回滚
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
lastAccess = currentTime();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}
关闭连接
finally {
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle(lastAccess);
}
static final Connection CLOSED_CONNECTION = getClosedConnection();
private static Connection getClosedConnection()
{
InvocationHandler handler = (proxy, method, args) -> {
final String methodName = method.getName();
if ("isClosed".equals(methodName)) {
return Boolean.TRUE;
}
else if ("isValid".equals(methodName)) {
return Boolean.FALSE;
}
if ("abort".equals(methodName)) {
return Void.TYPE;
}
if ("close".equals(methodName)) {
return Void.TYPE;
}
else if ("toString".equals(methodName)) {
return ClosedConnection.class.getCanonicalName();
}
throw new SQLException("Connection is closed");
};
return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
}
先设置一个空的连接,用代理返回一个不合法的连接
进入recycle循环利用
void recycle(final PoolEntry poolEntry)
{
metricsTracker.recordConnectionUsage(poolEntry);
connectionBag.requite(poolEntry);
}
public void requite(final T bagEntry)
{
bagEntry.setState(STATE_NOT_IN_USE);
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
Thread.yield();
}
}
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
1.设置为为未使用状态
- 如果等待连接的线程数大于 0,放连接到handoffQueue中,刚好与获取连接的线程配对
handoffQueue.offer(bagEntry)
- 如果threadList大小小于50,放进去
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
weakThreadLocals可以根据配置是否设置为弱引用(发生gc就回收)
数据库连接池的三级缓存
三级缓存threadList
本地线程存储使用,回收的连接,最多存储50个,在真正关闭连接时移除
三级缓存线程隔离,数据无需同步,速度最快
二级缓存sharedList
在创建新的连接就放入,在移除连接时取出
sharedList是线程共享的集合对象,这里使用的类型是CopyOnWriteArrayList
对于CopyOnWriteArrayList需要掌握以下几点
创建:CopyOnWriteArrayList()
添加元素:即add(E)方法
获取单个对象:即get(int)方法
删除对象:即remove(E)方法
遍历所有对象:即iterator(),在实际中更常用的是增强型的for循环去做遍历
注:CopyOnWriteArrayList是一个线程安全,读操作时无锁的ArrayList。
CopyOnWriteArrayList的创建
public CopyOnWriteArrayList()
使用方法:
List list = new CopyOnWriteArrayList();
-
CopyOnWriteArrayList(写数组的拷贝)是ArrayList的一个线程安全的变体,CopyOnWriteArrayList和CopyOnWriteSet都是线程安全的集合,其中所有可变操作(add、set等等)都是通过对底层数组进行一次新的复制来实现的。
-
它绝对不会抛出ConcurrentModificationException的异常。因为该列表(CopyOnWriteArrayList)在遍历时将不会被做任何的修改。
-
CopyOnWriteArrayList适合用在“读多,写少”的“并发”应用中,换句话说,它适合使用在读操作远远大于写操作的场景里,比如缓存。它不存在“扩容”的概念,每次写操作(add or remove)都要copy一个副本,在副本的基础上修改后改变array引用,所以称为“CopyOnWrite”,因此在写操作是加锁,并且对整个list的copy操作时相当耗时的,过多的写操作不推荐使用该存储结构。
-
CopyOnWriteArrayList的功能是是创建一个列表,有三种构造方法:
(1)CopyOnWriteArrayList ()创建一个空列表。
(2)CopyOnWriteArrayList (Collection<? extendsE> c)
创建一个按 collection的迭代器返回元素的顺序包含指定 collection元素的列表。
(3)CopyOnWriteArrayList(E[] toCopyIn)
创建一个保存给定数组的副本的列表
CopyOnWriteArrayList虽然写时使用了同步锁但是读无锁,在读多的场景下效率高。
一级缓存handoffQueue
前面介绍过这是一个同步队列
获取连接的消费者线程,与关闭连接时循环使用的的生产者线程配对
使用同步,消费者线程会阻塞,效率最低
思考为什么会发生数据库连接开启事务导致锁死行纪录
数据库连接建立后,是一个长连接在数据库与客户端之间
如果客户端在未回滚的情况下断开连接,数据库服务器感知到,会回滚该连接事务并移除连接
如果客户端非正常断开连接,不是关闭线程等操作,而是突然宕机,断网,导致与服务器的连接变为死连接,如果不设置事务超时时间,那么这个事务一直存在,一直不回滚,导致锁死行记录,因此设置事务超时时间是有必要的。