HikariDataSource

2年前 (2022) 程序员胖胖胖虎阿
222 0 0

DataSourceConfiguration

配置类,springBoot默认采用HikariDataSource

/**
	 * Hikari DataSource configuration.
	 */
	@Configuration(proxyBeanMethods = false)
	@ConditionalOnClass(HikariDataSource.class)
	@ConditionalOnMissingBean(DataSource.class)
	@ConditionalOnProperty(name = "spring.datasource.type", havingValue = "com.zaxxer.hikari.HikariDataSource",
			matchIfMissing = true)
	static class Hikari {

		@Bean
		@ConfigurationProperties(prefix = "spring.datasource.hikari")
		HikariDataSource dataSource(DataSourceProperties properties) {
			HikariDataSource dataSource = createDataSource(properties, HikariDataSource.class);
			if (StringUtils.hasText(properties.getName())) {
				dataSource.setPoolName(properties.getName());
			}
			return dataSource;
		}

	}

@Configuration(proxyBeanMethods = false)得含义是
@Configuration注解的意思是proxyBeanMethods配置类是用来指定@Bean注解标注的方法是否使用代理,默认是true使用代理,直接从IOC容器之中取得对象;如果设置为false,也就是不使用注解,每次调用@Bean标注的方法获取到的对象和IOC容器中的都不一样,是一个新的对象,所以我们可以将此属性设置为false来提高性能;
**@ConditionalOnProperty(name = “spring.datasource.type”, havingValue = “com.zaxxer.hikari.HikariDataSource”,
** matchIfMissing = true)**
如果配置spring.datasource.type为com.zaxxer.hikari.HikariDataSource才加载,如果配置为空,则默认加载此数据源

HikariDataSource

HikariDataSource

一个一个看这些接口

AutoCloseable

代表一个对象在close之前可能持有某些资源(文件或socket)。如果对象是在try-with-resources代码块中声明的,
AutoCloseable对象的close()方法会被自动执行。这种构造方式保证了最快的资源释放,避免资源耗尽异常。void close() throws Exception
关闭资源,放弃所有内在的资源。如果对象是在try-with-resources代码块中声明的,那么这个方法会自动被执行。
虽然这个接口的方法声明成抛出Exception异常,但是强烈推荐实现类抛出更详细的异常,或者不要要出异常。
需要注意实现类的close操作可能会失败。 强烈推荐优先放弃内部资源和内部标记资源已经关闭,尽量不去抛出异常。close方法不会执行

多次,所以要确保资源能够及时释放。此外,优先关闭内部资源可以降低资源封装带来的问题,假如一个资源包含了另一个资源,要依次关闭。

实现类也强烈建议不要抛出InterruptedException异常。这个异常和thread的 interrupt标志相关,而且线程运行时可能会出现一些迷惑行为。
例如实现类

import lombok.Data;

@Data
public class AutoCloseAbleTest implements AutoCloseable {

    private boolean closed = false;

    @Override
    public void close() throws IllegalArgumentException {
        closed = true;

        throw new IllegalArgumentException("测试抛出异常");
    }


    public void doSomething(){
        System.out.println("现在资源是开着的");
    }
}


测试类

public class AutoCloseAbleExample {

    /**
     * 使用try-with-resources模式声明资源
     * @param args
     */
    public static void main(String[] args){
        try(AutoCloseAbleTest test = new AutoCloseAbleTest()){
            test.doSomething();
        }catch (IllegalArgumentException e){
            e.printStackTrace();
        }

    }
}

执行结果

java.lang.IllegalArgumentException: 测试抛出异常
	at org.zhuzhenxi.learning.lang.autocloseable.AutoCloseAbleTest.close(AutoCloseAbleTest.java:14)
	at org.zhuzhenxi.learning.lang.autocloseable.AutoCloseAbleExample.main(AutoCloseAbleExample.java:7)


HikariConfig

 // Properties changeable at runtime through the HikariConfigMXBean
   //
   private volatile String catalog;
   private volatile long connectionTimeout;
   private volatile long validationTimeout;
   private volatile long idleTimeout;
   private volatile long leakDetectionThreshold;
   private volatile long maxLifetime;
   private volatile int maxPoolSize;
   private volatile int minIdle;
   private volatile String username;
   private volatile String password;

   // Properties NOT changeable at runtime
   //
   private long initializationFailTimeout;
   private String connectionInitSql;
   private String connectionTestQuery;
   private String dataSourceClassName;
   private String dataSourceJndiName;
   private String driverClassName;
   private String exceptionOverrideClassName;
   private String jdbcUrl;
   private String poolName;
   private String schema;
   private String transactionIsolationName;
   private boolean isAutoCommit;
   private boolean isReadOnly;
   private boolean isIsolateInternalQueries;
   private boolean isRegisterMbeans;
   private boolean isAllowPoolSuspension;
   private DataSource dataSource;
   private Properties dataSourceProperties;
   private ThreadFactory threadFactory;
   private ScheduledExecutorService scheduledExecutor;
   private MetricsTrackerFactory metricsTrackerFactory;
   private Object metricRegistry;
   private Object healthCheckRegistry;
   private Properties healthCheckProperties;

配置类,有很多配置项

name    描述    构造器默认值    默认配置validate之后的值    validate重置
autoCommit    自动提交从池中返回的连接    true    true    -
connectionTimeout    等待来自池的连接的最大毫秒数    SECONDS.toMillis(30) = 30000    30000    如果小于250毫秒,则被重置回30秒
idleTimeout    连接允许在池中闲置的最长时间    MINUTES.toMillis(10) = 600000    600000    如果idleTimeout+1>maxLifetime 且 maxLifetime>0,则会被重置为0(代表永远不会退出);如果idleTimeout!=0且小于10秒,则会被重置为10秒
maxLifetime    池中连接最长生命周期    MINUTES.toMillis(30) = 1800000    1800000    如果不等于0且小于30秒则会被重置回30分钟
connectionTestQuery    如果您的驱动程序支持JDBC4,我们强烈建议您不要设置此属性    null    null    -
minimumIdle    池中维护的最小空闲连接数    -1    10    minIdle<0或者minIdle>maxPoolSize,则被重置为maxPoolSize
maximumPoolSize    池中最大连接数,包括闲置和使用中的连接    -1    10    如果maxPoolSize小于1,则会被重置。当minIdle<=0被重置为DEFAULT_POOL_SIZE则为10;如果minIdle>0则重置为minIdle的值
metricRegistry    该属性允许您指定一个 Codahale / Dropwizard MetricRegistry 的实例,供池使用以记录各种指标    null    null    -
healthCheckRegistry    该属性允许您指定池使用的Codahale / Dropwizard HealthCheckRegistry的实例来报告当前健康信息    null    null    -
poolName    连接池的用户定义名称,主要出现在日志记录和JMX管理控制台中以识别池和池配置    null    HikariPool-1    -
initializationFailTimeout    如果池无法成功初始化连接,则此属性控制池是否将 fail fast    1    1    -
isolateInternalQueries    是否在其自己的事务中隔离内部池查询,例如连接活动测试    false    false    -
allowPoolSuspension    控制池是否可以通过JMX暂停和恢复    false    false    -
readOnly    从池中获取的连接是否默认处于只读模式    false    false    -
registerMbeans    是否注册JMX管理BeanMBeansfalse    false    -
catalog    为支持 catalog 概念的数据库设置默认 catalog    driver default    null    -
connectionInitSql    该属性设置一个SQL语句,在将每个新连接创建后,将其添加到池中之前执行该语句。    null    null    -
driverClassName    HikariCP将尝试通过仅基于jdbcUrl的DriverManager解析驱动程序,但对于一些较旧的驱动程序,还必须指定driverClassName    null    null    -
transactionIsolation    控制从池返回的连接的默认事务隔离级别    null    null    -
validationTimeout    连接将被测试活动的最大时间量    SECONDS.toMillis(5) = 5000    5000    如果小于250毫秒,则会被重置回5秒
leakDetectionThreshold    记录消息之前连接可能离开池的时间量,表示可能的连接泄漏    0    0    如果大于0且不是单元测试,则进一步判断:(leakDetectionThreshold < SECONDS.toMillis(2) or (leakDetectionThreshold > maxLifetime && maxLifetime > 0),会被重置为0 . 即如果要生效则必须>0,而且不能小于2秒,而且当maxLifetime > 0时不能大于maxLifetime
dataSource    这个属性允许你直接设置数据源的实例被池包装,而不是让HikariCP通过反射来构造它    null    null    -
schema    该属性为支持模式概念的数据库设置默认模式    driver default    null    -
threadFactory    此属性允许您设置将用于创建池使用的所有线程的java.util.concurrent.ThreadFactory的实例。    null    null    -
scheduledExecutor    此属性允许您设置将用于各种内部计划任务的java.util.concurrent.ScheduledExecutorService实例    null    null    -

DataSource

两个获取数据库连接的接口

public interface DataSource  extends CommonDataSource, Wrapper {

  /**
   * <p>Attempts to establish a connection with the data source that
   * this {@code DataSource} object represents.
   *
   * @return  a connection to the data source
   * @exception SQLException if a database access error occurs
   * @throws java.sql.SQLTimeoutException  when the driver has determined that the
   * timeout value specified by the {@code setLoginTimeout} method
   * has been exceeded and has at least tried to cancel the
   * current database connection attempt
   */
  Connection getConnection() throws SQLException;

  /**
   * <p>Attempts to establish a connection with the data source that
   * this {@code DataSource} object represents.
   *
   * @param username the database user on whose behalf the connection is
   *  being made
   * @param password the user's password
   * @return  a connection to the data source
   * @exception SQLException if a database access error occurs
   * @throws java.sql.SQLTimeoutException  when the driver has determined that the
   * timeout value specified by the {@code setLoginTimeout} method
   * has been exceeded and has at least tried to cancel the
   * current database connection attempt
   * @since 1.4
   */
  Connection getConnection(String username, String password)
    throws SQLException;
}

HikariPool

hikaripool就是数据库连接池的核心了
HikariDataSource

PoolBase

构造进行初始化

 PoolBase(final HikariConfig config)
   {
      this.config = config;

      this.networkTimeout = UNINITIALIZED;
      this.catalog = config.getCatalog();
      this.schema = config.getSchema();
      this.isReadOnly = config.isReadOnly();
      this.isAutoCommit = config.isAutoCommit();
      this.exceptionOverride = UtilityElf.createInstance(config.getExceptionOverrideClassName(), SQLExceptionOverride.class);
      this.transactionIsolation = UtilityElf.getTransactionIsolation(config.getTransactionIsolation());

      this.isQueryTimeoutSupported = UNINITIALIZED;
      this.isNetworkTimeoutSupported = UNINITIALIZED;
      this.isUseJdbc4Validation = config.getConnectionTestQuery() == null;
      this.isIsolateInternalQueries = config.isIsolateInternalQueries();

      this.poolName = config.getPoolName();
      this.connectionTimeout = config.getConnectionTimeout();
      this.validationTimeout = config.getValidationTimeout();
      this.lastConnectionFailure = new AtomicReference<>();

      initializeDataSource();
   }
   private void initializeDataSource()
   {
      final String jdbcUrl = config.getJdbcUrl();
      final String username = config.getUsername();
      final String password = config.getPassword();
      final String dsClassName = config.getDataSourceClassName();
      final String driverClassName = config.getDriverClassName();
      final String dataSourceJNDI = config.getDataSourceJNDI();
      final Properties dataSourceProperties = config.getDataSourceProperties();

      DataSource ds = config.getDataSource();
      if (dsClassName != null && ds == null) {
         ds = createInstance(dsClassName, DataSource.class);
         PropertyElf.setTargetFromProperties(ds, dataSourceProperties);
      }
      else if (jdbcUrl != null && ds == null) {
         ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password);
      }
      else if (dataSourceJNDI != null && ds == null) {
         try {
            InitialContext ic = new InitialContext();
            ds = (DataSource) ic.lookup(dataSourceJNDI);
         } catch (NamingException e) {
            throw new PoolInitializationException(e);
         }
      }

      if (ds != null) {
         setLoginTimeout(ds);
         createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl);
      }

      this.dataSource = ds;
   }

hikariPoll构造初始化

 public HikariPool(final HikariConfig config)
   {
      super(config);

     //数据库连接背包,相当于线程池的任务队列
      this.connectionBag = new ConcurrentBag<>(this);
     //是否控制获取连接数
      this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

     //创建一个线程池
      this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

      checkFailFast();

      if (config.getMetricsTrackerFactory() != null) {
         setMetricsTrackerFactory(config.getMetricsTrackerFactory());
      }
      else {
         setMetricRegistry(config.getMetricRegistry());
      }

      setHealthCheckRegistry(config.getHealthCheckRegistry());

      handleMBeans(this, true);

      ThreadFactory threadFactory = config.getThreadFactory();

      final int maxPoolSize = config.getMaximumPoolSize();
      LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(maxPoolSize);
      this.addConnectionQueueReadOnlyView = unmodifiableCollection(addConnectionQueue);
      this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
      this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

      this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);

      this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, housekeepingPeriodMs, MILLISECONDS);

      if (Boolean.getBoolean("com.zaxxer.hikari.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
         addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
         addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));

         final long startTime = currentTime();
         while (elapsedMillis(startTime) < config.getInitializationFailTimeout() && getTotalConnections() < config.getMinimumIdle()) {
            quietlySleep(MILLISECONDS.toMillis(100));
         }

         addConnectionExecutor.setCorePoolSize(1);
         addConnectionExecutor.setMaximumPoolSize(1);
      }
   }

com.zaxxer.hikari.pool.HikariPool#checkFailFast

private void checkFailFast()
   {
      final long initializationTimeout = config.getInitializationFailTimeout();
      if (initializationTimeout < 0) {
         return;
      }

      final long startTime = currentTime();
      do {
         final PoolEntry poolEntry = createPoolEntry();
         if (poolEntry != null) {
            if (config.getMinimumIdle() > 0) {
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
            }
            else {
               quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
            }

            return;
         }

         if (getLastConnectionFailure() instanceof ConnectionSetupException) {
            throwPoolInitializationException(getLastConnectionFailure().getCause());
         }

         quietlySleep(SECONDS.toMillis(1));
      } while (elapsedMillis(startTime) < initializationTimeout);

      if (initializationTimeout > 0) {
         throwPoolInitializationException(getLastConnectionFailure());
      }
   }

前面说过initializationFailTimeout是否要检查进行failfast快速失败(启动就检查连接报错,而不是等真正需要数据库操作时才发现错误),

PoolEntry

用来封装一个数据库连接

 Connection connection;
   long lastAccessed;
   long lastBorrowed;

   @SuppressWarnings("FieldCanBeLocal")
   private volatile int state = 0;
   private volatile boolean evict;

   private volatile ScheduledFuture<?> endOfLife;

   private final FastList<Statement> openStatements;
   private final HikariPool hikariPool;

   private final boolean isReadOnly;
   private final boolean isAutoCommit;

先创建一个连接

com.zaxxer.hikari.pool.PoolBase#newConnection

private Connection newConnection() throws Exception
   {
      final long start = currentTime();

      Connection connection = null;
      try {
         String username = config.getUsername();
         String password = config.getPassword();

         connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);
         if (connection == null) {
            throw new SQLTransientConnectionException("DataSource returned null unexpectedly");
         }

         setupConnection(connection);
         lastConnectionFailure.set(null);
         return connection;
      }
      catch (Exception e) {
         if (connection != null) {
            quietlyCloseConnection(connection, "(Failed to create/setup connection)");
         }
         else if (getLastConnectionFailure() == null) {
            logger.debug("{} - Failed to create/setup connection: {}", poolName, e.getMessage());
         }

         lastConnectionFailure.set(e);
         throw e;
      }
      finally {
         // tracker will be null during failFast check
         if (metricsTracker != null) {
            metricsTracker.recordConnectionCreated(elapsedMillis(start));
         }
      }
   }

com.zaxxer.hikari.pool.PoolBase#setupConnection
根据配置设置连接,默认设置自动提交,非只读事务

 private void setupConnection(final Connection connection) throws ConnectionSetupException
   {
      try {
         if (networkTimeout == UNINITIALIZED) {
            networkTimeout = getAndSetNetworkTimeout(connection, validationTimeout);
         }
         else {
            setNetworkTimeout(connection, validationTimeout);
         }

         if (connection.isReadOnly() != isReadOnly) {
            connection.setReadOnly(isReadOnly);
         }

         if (connection.getAutoCommit() != isAutoCommit) {
            connection.setAutoCommit(isAutoCommit);
         }

         checkDriverSupport(connection);

         if (transactionIsolation != defaultTransactionIsolation) {
            connection.setTransactionIsolation(transactionIsolation);
         }

         if (catalog != null) {
            connection.setCatalog(catalog);
         }

         if (schema != null) {
            connection.setSchema(schema);
         }

         executeSql(connection, config.getConnectionInitSql(), true);

         setNetworkTimeout(connection, networkTimeout);
      }
      catch (SQLException e) {
         throw new ConnectionSetupException(e);
      }
   }

如果上层逻辑不希望自动提交,可以重新设置是否自动提交。例如spring事务,会在获取到数据库连接后再设置不自动提交。

通过jdbc建立一个数据库连接

 PoolEntry newPoolEntry() throws Exception
   {
      return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
   }

存储了除了数据库连接coneection还有 是不是只读事务,是否自动提交等信息

创建poolEntry后

  if (poolEntry != null) {
            if (config.getMinimumIdle() > 0) {
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
            }
            else {
               quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
            }

            return;
         }

如果配置的最小空闲时为0,放入connectionBag,否则立马关闭或这个连接
放入bag

com.zaxxer.hikari.util.ConcurrentBag#add

   public void add(final T bagEntry)
   {
      if (closed) {
         LOGGER.info("ConcurrentBag has been closed, ignoring add()");
         throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
      }

      sharedList.add(bagEntry);

      // spin until a thread takes it or none are waiting
      while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
         Thread.yield();
      }
   }

放到sharedList中CopyOnWriteArrayList时一个线程安全的集合

 private final CopyOnWriteArrayList<T> sharedList;

连接池获取一个连接

com.zaxxer.hikari.HikariDataSource#getConnection()

开启事务,或者数据库查询都会获取连接

 public Connection getConnection() throws SQLException
   {
      if (isClosed()) {
         throw new SQLException("HikariDataSource " + this + " has been closed.");
      }

      if (fastPathPool != null) {
         return fastPathPool.getConnection();
      }

      // See http://en.wikipedia.org/wiki/Double-checked_locking#Usage_in_Java
      HikariPool result = pool;
      if (result == null) {
         synchronized (this) {
            result = pool;
            if (result == null) {
               validate();
               LOGGER.info("{} - Starting...", getPoolName());
               try {
                  pool = result = new HikariPool(this);
                  this.seal();
               }
               catch (PoolInitializationException pie) {
                  if (pie.getCause() instanceof SQLException) {
                     throw (SQLException) pie.getCause();
                  }
                  else {
                     throw pie;
                  }
               }
               LOGGER.info("{} - Start completed.", getPoolName());
            }
         }
      }

      return result.getConnection();
   }

进入

com.zaxxer.hikari.pool.HikariPool#getConnection(long)

  public Connection getConnection(final long hardTimeout) throws SQLException
   {
      suspendResumeLock.acquire();
      final long startTime = currentTime();

      try {
         long timeout = hardTimeout;
         do {
            PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
            if (poolEntry == null) {
               break; // We timed out... break and throw exception
            }

            final long now = currentTime();
            if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && !isConnectionAlive(poolEntry.connection))) {
               closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
               timeout = hardTimeout - elapsedMillis(startTime);
            }
            else {
               metricsTracker.recordBorrowStats(poolEntry, startTime);
               return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
            }
         } while (timeout > 0L);

         metricsTracker.recordBorrowTimeoutStats(startTime);
         throw createTimeoutException(startTime);
      }
      catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
      }
      finally {
         suspendResumeLock.release();
      }
   }

suspendResumeLock

数据库连接池暂停恢复锁。如果hikari配置中设置isAllowPoolSuspension为true,那么就会在HikariPool中实例一个暂停恢复锁。

初始化

  this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

如果配置了isAllowPoolSuspension那么会有Semaphore进行控制,如果没配置,那么使用的FAUX_LOCK是一个空的实现,也就是没有并发控制
配置了使用一个公平锁控制并发量
其实就是在限制连接的数量,如果连接超过10K,那么就会阻塞直到有别的连接断开后释放了信号量。这是控制IO流量的一种常见的方式。

  private static final int MAX_PERMITS = 10000;
 private SuspendResumeLock(final boolean createSemaphore)
   {
      acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
   }
public class SuspendResumeLock
{
   public static final SuspendResumeLock FAUX_LOCK = new SuspendResumeLock(false) {
      @Override
      public void acquire() {}

      @Override
      public void release() {}

      @Override
      public void suspend() {}

      @Override
      public void resume() {}
   };

   private static final int MAX_PERMITS = 10000;
   private final Semaphore acquisitionSemaphore;

   /**
    * Default constructor
    */
   public SuspendResumeLock()
   {
      this(true);
   }

   private SuspendResumeLock(final boolean createSemaphore)
   {
      acquisitionSemaphore = (createSemaphore ? new Semaphore(MAX_PERMITS, true) : null);
   }

   public void acquire() throws SQLException
   {
      if (acquisitionSemaphore.tryAcquire()) {
         return;
      }
      else if (Boolean.getBoolean("com.zaxxer.hikari.throwIfSuspended")) {
         throw new SQLTransientException("The pool is currently suspended and configured to throw exceptions upon acquisition");
      }

      acquisitionSemaphore.acquireUninterruptibly();
   }

   public void release()
   {
      acquisitionSemaphore.release();
   }

   public void suspend()
   {
      acquisitionSemaphore.acquireUninterruptibly(MAX_PERMITS);
   }

   public void resume()
   {
      acquisitionSemaphore.release(MAX_PERMITS);
   }
}

继续获取连接

     PoolEntry poolEntry = connectionBag.borrow(timeout, MILLISECONDS);

进入ConcurrentBag

 public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
   {
      // Try the thread-local list first
      final List<Object> list = threadList.get();
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

      // Otherwise, scan the shared list ... then poll the handoff queue
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         listener.addBagItem(waiting);

         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

尝试threadList中获取数据库连接对象
如果没用获取到,添加等待数量
尝试从sharedList中获取,获取到通过cas将未使用的状态设置成使用的状态
这里有个listener

IBagStateListener

   public interface IBagStateListener
   {
      void addBagItem(int waiting);
   }

实现了这个接口,向bag中加入连接

PoolEntryCreator

创建新得连接

private final class PoolEntryCreator implements Callable<Boolean>
   {
      private final String loggingPrefix;

      PoolEntryCreator(String loggingPrefix)
      {
         this.loggingPrefix = loggingPrefix;
      }

      @Override
      public Boolean call()
      {
         long sleepBackoff = 250L;
         while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
            final PoolEntry poolEntry = createPoolEntry();
            if (poolEntry != null) {
               connectionBag.add(poolEntry);
               logger.debug("{} - Added connection {}", poolName, poolEntry.connection);
               if (loggingPrefix != null) {
                  logPoolState(loggingPrefix);
               }
               return Boolean.TRUE;
            }

            // failed to get connection from db, sleep and retry
            if (loggingPrefix != null) logger.debug("{} - Connection add failed, sleeping with backoff: {}ms", poolName, sleepBackoff);
            quietlySleep(sleepBackoff);
            sleepBackoff = Math.min(SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5)));
         }

         // Pool is suspended or shutdown or at max size
         return Boolean.FALSE;
      }
private synchronized boolean shouldCreateAnotherConnection() {
         return getTotalConnections() < config.getMaximumPoolSize() &&
            (connectionBag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
      }

判断是否应该添加连接 不能大于最大配置的连接数,并且有线程在等待获取连接,并且空闲得连接数小于最小得空闲数
创建新的连接

如果sharedList中也没有获取到空闲连接

 do {
            final long start = currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= elapsedNanos(start);
         } while (timeout > 10_000);

         return null;

当前线程需要等待阻塞等到空闲连接

ProxyLeakTask

class ProxyLeakTask implements Runnable
{
   private static final Logger LOGGER = LoggerFactory.getLogger(ProxyLeakTask.class);
   static final ProxyLeakTask NO_LEAK;

   private ScheduledFuture<?> scheduledFuture;
   private String connectionName;
   private Exception exception;
   private String threadName; 
   private boolean isLeaked;

   static
   {
      NO_LEAK = new ProxyLeakTask() {
         @Override
         void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold) {}

         @Override
         public void run() {}

         @Override
         public void cancel() {}
      };
   }

   ProxyLeakTask(final PoolEntry poolEntry)
   {
      this.exception = new Exception("Apparent connection leak detected");
      this.threadName = Thread.currentThread().getName();
      this.connectionName = poolEntry.connection.toString();
   }

   private ProxyLeakTask()
   {
   }

   void schedule(ScheduledExecutorService executorService, long leakDetectionThreshold)
   {
      scheduledFuture = executorService.schedule(this, leakDetectionThreshold, TimeUnit.MILLISECONDS);
   }

   /** {@inheritDoc} */
   @Override
   public void run()
   {
      isLeaked = true;

      final StackTraceElement[] stackTrace = exception.getStackTrace(); 
      final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
      System.arraycopy(stackTrace, 5, trace, 0, trace.length);

      exception.setStackTrace(trace);
      LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
   }

   void cancel()
   {
      scheduledFuture.cancel(false);
      if (isLeaked) {
         LOGGER.info("Previously reported leaked connection {} on thread {} was returned to the pool (unleaked)", connectionName, threadName);
      }
   }
}

leakDetectionThreshold 设定了连接泄露阀值10s,即当从连接池里取出连接超过10s不归还时,会warn Apparent connection leak detected,用来警告数据库连接的泄露风险。
在连接归还或被踢除时,会清除此调度任务,详见ProxyConnection。即在10s内归还连接,会清除调度任务。超过10s调度任务执行。

清除调度任务时,如果已经超过调度时间,会LOGGER.info(“Previously reported leaked connection {} was returned to the pool (unleaked)”, connectionName);。详见ProxyLeakTask#cancel。

HikariDataSource

这个case中最大的亮点是。Hikari warn日志中详细的打印出,存在连接泄露的代码行数,异常信息非常直白易懂。实现逻辑详见 ProxyLeakTask#ProxyLeakTask ProxyLeakTask#run。优秀的代码设计+不错的性能,也难怪SpringBoot2.0起默认数据库连接池从TomcatPool换到了「HikariCP」。
具体看如何实现如何精准打印堆栈的

  ProxyLeakTask(final PoolEntry poolEntry)
   {
      this.exception = new Exception("Apparent connection leak detected");
      this.threadName = Thread.currentThread().getName();
      this.connectionName = poolEntry.connection.toString();
   }

在构造ProxyLeakTask创建了exception,异常中

 @Override
   public void run()
   {
      isLeaked = true;

      final StackTraceElement[] stackTrace = exception.getStackTrace(); 
      final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
      System.arraycopy(stackTrace, 5, trace, 0, trace.length);

      exception.setStackTrace(trace);
      LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
   }

可以看出来,减了5层的堆栈,刚好就是查询处的堆栈

handoffQueue

SynchronousQueue<T> handoffQueue;

我们知道SynchronousQueue是一个比较特殊的队列 投放任务的生产者在没用和消费者匹配之前,生产者线程会被阻塞。生产者和消费者配一对一通信。
这里,getConection充当一个消费者的角色,去尝试取出数据
至于怎么放入数据,我们后面继续看。

返回一个来连接后,对原生数据库连接做扩展,

         return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry), now);
  Connection createProxyConnection(final ProxyLeakTask leakTask, final long now)
   {
      return ProxyFactory.getProxyConnection(this, connection, openStatements, leakTask, now, isReadOnly, isAutoCommit);
   }

打开ProxyFactory.getProxyConnection这个方法看起来很有意思

 static ProxyConnection getProxyConnection(final PoolEntry poolEntry, final Connection connection, final FastList<Statement> openStatements, final ProxyLeakTask leakTask, final long now, final boolean isReadOnly, final boolean isAutoCommit)
   {
      // Body is replaced (injected) by JavassistProxyFactory
      throw new IllegalStateException("You need to run the CLI build and you need target/classes in your classpath to run.");
   }

发现他是一个空的实现,直接抛出异常并且注释Body is replaced (injected) by JavassistProxyFactory
而且它也没用子类实现,那么如何实现创建对应的conneection对象的呢。

当我们打开这个类就明白了
HikariDataSource

  case "getProxyConnection":
               method.setBody("{return new " + packageName + ".HikariProxyConnection($$);}");
               break;

利用 javaasist直接修改ProxyFactory的字节码,
这样获取的连接被ProxyConnection包裹扩展

ProxyConnection

public abstract class ProxyConnection implements Connection
{
   static final int DIRTY_BIT_READONLY   = 0b000001;
   static final int DIRTY_BIT_AUTOCOMMIT = 0b000010;
   static final int DIRTY_BIT_ISOLATION  = 0b000100;
   static final int DIRTY_BIT_CATALOG    = 0b001000;
   static final int DIRTY_BIT_NETTIMEOUT = 0b010000;
   static final int DIRTY_BIT_SCHEMA     = 0b100000;

   private static final Logger LOGGER;
   private static final Set<String> ERROR_STATES;
   private static final Set<Integer> ERROR_CODES;

   @SuppressWarnings("WeakerAccess")
   protected Connection delegate;

   private final PoolEntry poolEntry;
   private final ProxyLeakTask leakTask;
   private final FastList<Statement> openStatements;

   private int dirtyBits;
   private long lastAccess;
   private boolean isCommitStateDirty;

   private boolean isReadOnly;
   private boolean isAutoCommit;
   private int networkTimeout;
   private int transactionIsolation;
   private String dbcatalog;
   private String dbschema;
}

其内部也维护了连接的一些信息。
在使用spring事务的情况下获取一个连接后
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();

			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);
			txObject.setReadOnly(definition.isReadOnly());

			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
			// so we don't want to do it unnecessarily (for example if we've explicitly
			// configured the connection pool to set it already).
			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				con.setAutoCommit(false);
			}

			prepareTransactionalConnection(con, definition);
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the connection holder to the thread.
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}

会根据事务注解设置事务自动提交,隔离级别,是否为只读连接信息。
这样spring创建了他的事务

事务提交方法

  @Override
   public void commit() throws SQLException
   {
   //实际的数据库连接
      delegate.commit();
      //设置状态
      isCommitStateDirty = false;
      lastAccess = currentTime();
   }

关闭连接方法

 @Override
   public final void close() throws SQLException
   {
      // Closing statements can cause connection eviction, so this must run before the conditional below
      closeStatements();

      if (delegate != ClosedConnection.CLOSED_CONNECTION) {
         leakTask.cancel();

         try {
            if (isCommitStateDirty && !isAutoCommit) {
               delegate.rollback();
               lastAccess = currentTime();
               LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
            }

            if (dirtyBits != 0) {
               poolEntry.resetConnectionState(this, dirtyBits);
               lastAccess = currentTime();
            }

            delegate.clearWarnings();
         }
         catch (SQLException e) {
            // when connections are aborted, exceptions are often thrown that should not reach the application
            if (!poolEntry.isMarkedEvicted()) {
               throw checkException(e);
            }
         }
         finally {
            delegate = ClosedConnection.CLOSED_CONNECTION;
            poolEntry.recycle(lastAccess);
         }
      }
   }

首先关闭此连接创建的Statement
com.zaxxer.hikari.pool.ProxyConnection#closeStatements

private synchronized void closeStatements()
   {
      final int size = openStatements.size();
      if (size > 0) {
         for (int i = 0; i < size && delegate != ClosedConnection.CLOSED_CONNECTION; i++) {
            try (Statement ignored = openStatements.get(i)) {
               // automatic resource cleanup
            }
            catch (SQLException e) {
               LOGGER.warn("{} - Connection {} marked as broken because of an exception closing open statements during Connection.close()",
                           poolEntry.getPoolName(), delegate);
               leakTask.cancel();
               poolEntry.evict("(exception closing Statements during Connection.close())");
               delegate = ClosedConnection.CLOSED_CONNECTION;
            }
         }

         openStatements.clear();
      }
   }

这里她利用了try-resources-catch来释放资源

leakTask是一个打印异常日志的定时任务

 @Override
   public void run()
   {
      isLeaked = true;

      final StackTraceElement[] stackTrace = exception.getStackTrace(); 
      final StackTraceElement[] trace = new StackTraceElement[stackTrace.length - 5];
      System.arraycopy(stackTrace, 5, trace, 0, trace.length);

      exception.setStackTrace(trace);
      LOGGER.warn("Connection leak detection triggered for {} on thread {}, stack trace follows", connectionName, threadName, exception);
   }

如果关闭stateMent失败,那么直接关闭这个数据连接

   poolEntry.evict("(exception closing Statements during Connection.close())");

事务执行完毕后需要恢复这个连接原本的状态

finally {
			cleanupAfterCompletion(status);
		}

org.springframework.jdbc.datasource.DataSourceTransactionManager#doCleanupAfterCompletion

protected void doCleanupAfterCompletion(Object transaction) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

		// Remove the connection holder from the thread, if exposed.
		if (txObject.isNewConnectionHolder()) {
			TransactionSynchronizationManager.unbindResource(obtainDataSource());
		}

		// Reset connection.
		Connection con = txObject.getConnectionHolder().getConnection();
		try {
			if (txObject.isMustRestoreAutoCommit()) {
				con.setAutoCommit(true);
			}
			DataSourceUtils.resetConnectionAfterTransaction(
					con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
		}
		catch (Throwable ex) {
			logger.debug("Could not reset JDBC Connection after transaction", ex);
		}

		if (txObject.isNewConnectionHolder()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
			}
			DataSourceUtils.releaseConnection(con, this.dataSource);
		}

		txObject.getConnectionHolder().clear();
	}

把autoConmit和隔离级别恢复成连接之前的状态。

回头看close方法
isCommitStateDirty代表事务未提交时有更新操作
如果有更新操作,并且在事务中, 需要回滚

    if (isCommitStateDirty && !isAutoCommit) {
               delegate.rollback();
               lastAccess = currentTime();
               LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
            }

关闭连接

     finally {
            delegate = ClosedConnection.CLOSED_CONNECTION;
            poolEntry.recycle(lastAccess);
         }
 static final Connection CLOSED_CONNECTION = getClosedConnection();

      private static Connection getClosedConnection()
      {
         InvocationHandler handler = (proxy, method, args) -> {
            final String methodName = method.getName();
            if ("isClosed".equals(methodName)) {
               return Boolean.TRUE;
            }
            else if ("isValid".equals(methodName)) {
               return Boolean.FALSE;
            }
            if ("abort".equals(methodName)) {
               return Void.TYPE;
            }
            if ("close".equals(methodName)) {
               return Void.TYPE;
            }
            else if ("toString".equals(methodName)) {
               return ClosedConnection.class.getCanonicalName();
            }

            throw new SQLException("Connection is closed");
         };

         return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class[] { Connection.class }, handler);
      }

先设置一个空的连接,用代理返回一个不合法的连接
进入recycle循环利用

 void recycle(final PoolEntry poolEntry)
   {
      metricsTracker.recordConnectionUsage(poolEntry);

      connectionBag.requite(poolEntry);
   }
public void requite(final T bagEntry)
   {
   
      bagEntry.setState(STATE_NOT_IN_USE);

      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         }
         else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         }
         else {
            Thread.yield();
         }
      }

      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList.size() < 50) {
         threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
   }

1.设置为为未使用状态

  1. 如果等待连接的线程数大于 0,放连接到handoffQueue中,刚好与获取连接的线程配对
 handoffQueue.offer(bagEntry)
  1. 如果threadList大小小于50,放进去
  threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);

weakThreadLocals可以根据配置是否设置为弱引用(发生gc就回收)

数据库连接池的三级缓存

三级缓存threadList

本地线程存储使用,回收的连接,最多存储50个,在真正关闭连接时移除
三级缓存线程隔离,数据无需同步,速度最快

二级缓存sharedList

在创建新的连接就放入,在移除连接时取出
sharedList是线程共享的集合对象,这里使用的类型是CopyOnWriteArrayList

对于CopyOnWriteArrayList需要掌握以下几点

创建:CopyOnWriteArrayList()
添加元素:即add(E)方法
获取单个对象:即get(int)方法
删除对象:即remove(E)方法
遍历所有对象:即iterator(),在实际中更常用的是增强型的for循环去做遍历
注:CopyOnWriteArrayList是一个线程安全,读操作时无锁的ArrayList。
CopyOnWriteArrayList的创建

public CopyOnWriteArrayList()

使用方法:

List list = new CopyOnWriteArrayList();

  1. CopyOnWriteArrayList(写数组的拷贝)是ArrayList的一个线程安全的变体,CopyOnWriteArrayList和CopyOnWriteSet都是线程安全的集合,其中所有可变操作(add、set等等)都是通过对底层数组进行一次新的复制来实现的。

  2. 它绝对不会抛出ConcurrentModificationException的异常。因为该列表(CopyOnWriteArrayList)在遍历时将不会被做任何的修改。

  3. CopyOnWriteArrayList适合用在“读多,写少”的“并发”应用中,换句话说,它适合使用在读操作远远大于写操作的场景里,比如缓存。它不存在“扩容”的概念,每次写操作(add or remove)都要copy一个副本,在副本的基础上修改后改变array引用,所以称为“CopyOnWrite”,因此在写操作是加锁,并且对整个list的copy操作时相当耗时的,过多的写操作不推荐使用该存储结构。

  4. CopyOnWriteArrayList的功能是是创建一个列表,有三种构造方法:

(1)CopyOnWriteArrayList ()创建一个空列表。

(2)CopyOnWriteArrayList (Collection<? extendsE> c)

创建一个按 collection的迭代器返回元素的顺序包含指定 collection元素的列表。

(3)CopyOnWriteArrayList(E[] toCopyIn)

创建一个保存给定数组的副本的列表

CopyOnWriteArrayList虽然写时使用了同步锁但是读无锁,在读多的场景下效率高。

一级缓存handoffQueue

前面介绍过这是一个同步队列
获取连接的消费者线程,与关闭连接时循环使用的的生产者线程配对
使用同步,消费者线程会阻塞,效率最低

思考为什么会发生数据库连接开启事务导致锁死行纪录

数据库连接建立后,是一个长连接在数据库与客户端之间

如果客户端在未回滚的情况下断开连接,数据库服务器感知到,会回滚该连接事务并移除连接

如果客户端非正常断开连接,不是关闭线程等操作,而是突然宕机,断网,导致与服务器的连接变为死连接,如果不设置事务超时时间,那么这个事务一直存在,一直不回滚,导致锁死行记录,因此设置事务超时时间是有必要的。

版权声明:程序员胖胖胖虎阿 发表于 2022年9月10日 下午9:16。
转载请注明:HikariDataSource | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...