Nacos-配置中心原理解析

2年前 (2022) 程序员胖胖胖虎阿
249 0 0

Nacos-配置中心原理解析

  • 一、配置中心原理猜想
  • 二、Nacos配置中心原理解析
    • 2.1 NacosFactory.createConfigService(properties)
      • 2.1.1 NacosConfigService
      • 2.1.2 ClientWorker
    • 2.2 configService
      • 2.2.1 configService.getConfig(String dataId, String group, long timeoutMs)
        • 从本地文件获取配置
        • 从Nacos服务器获取配置
      • 2.2.2 configService.addListener(String dataId, String group, Listener listener)
    • 2.3 长轮询机制
      • 2.3 客户端 LongPollingRunnable
      • 2.4 服务端
        • 2.4.1 ConfigServletInner.doPollingConfig
        • 2.4.2 LongPollingService.addLongPollingClient
        • 2.4.3 ClientLongPolling
        • 2.4.4 LocalDataChangeEvent
  • 三、总结

一、配置中心原理猜想

如果你来实现一个配置中心,你觉得有哪些需求点?

  1. 配置信息存储
  2. 配置中心控制台
  3. 配置中心文件发生变更后,数据同步:客户端定时任务-pull、服务端广播通知-push、长轮询机制)
  4. 配置中心集群(leader、follower,主节点选举,数据一致性)
    Nacos-配置中心原理解析

二、Nacos配置中心原理解析

注意:本文档是基于 nacos 1.1.3 版本编写。

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.1.3</version>
</dependency>

基于Nacos SDK 调用

public class NacosTest {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("serverAddr", "服务端地址");
        properties.put("namespace", "namespace");
        // 通过指定参数,创建一个 configService 
        ConfigService configService = NacosFactory.createConfigService(properties);

        String dataId = "testId";
        String group = "testGroup";

        // 通过dataId、group获取配置
        String config = configService.getConfig(dataId, group, 3000);
        System.out.println(config);

        // 监听服务端配置变更
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null;
            }

            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("-------配置发生变更,变更后的配置:" + configInfo);
            }
        });

        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }
}

2.1 NacosFactory.createConfigService(properties)

public class ConfigFactory {

    /**
     * Create Config
     *
     * @param properties init param
     * @return ConfigService
     * @throws NacosException Exception
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            // 获取带Properties参数的构造函数
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            // 反射创建
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }
}

通过代码分析,底层主要是获取带Properties参数的构造函数,通过反射创建 NacosConfigService 对象。

2.1.1 NacosConfigService

NacosConfigService 构造函数。


public class NacosConfigService implements ConfigService {
    
    private static final long POST_TIMEOUT = 3000L;
    private static final String EMPTY = "";

    // Http请求代理
    private HttpAgent agent;
    
    // 长轮询
    private ClientWorker worker;
    private String namespace;
    private String encode;
    
	// 创建一个 配置过滤器链管理器
    private ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager();
    
    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        
        // 初始化 namespace
        initNamespace(properties);
        
        // 创建一个Http通信代理类
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        agent.start();
        // 创建客户端工作对象:长轮询机制
        worker = new ClientWorker(agent, configFilterChainManager, properties);
    }
}

2.1.2 ClientWorker

ClientWorker 是Nacos中长轮询机制的实现类。

public class ClientWorker implements Closeable {

	public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        
        // 根据 properties 初始化 ClientWorker 属性
        init(properties);
        
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

		// 执行配置信息检查
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

	// 配置信息检查
	public void checkConfigInfo() {
        // 分任:分批处理
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                /****************长轮询************/
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

	// 根据 properties 初始化 ClientWorker 属性
    private void init(Properties properties) {
        // 请求超时时间:默认30s,可通过配置文件指定(configLongPollTimeout),最小10s
        timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
                Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
        
        // 任务异常后,重试时间,默认2秒
        taskPenaltyTime = ConvertUtils
                .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);
        
        // 是否开启远程信息同步
        this.enableRemoteSyncConfig = Boolean
                .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
    }
    
}

2.2 configService

nacos 配置中心提供了一个核心API接口,ConfigService。

public interface ConfigService {
    
    // 获取配置
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
    
    // 获取配置并注册监听器。
    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
            throws NacosException;
    
    // 添加监听
    void addListener(String dataId, String group, Listener listener) throws NacosException;
    
    // 发布配置
    boolean publishConfig(String dataId, String group, String content) throws NacosException;
    
    // 发布配置
    boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
    
    // 发布配置,附带md5值
    boolean publishConfigCas(String dataId, String group, String content, String casMd5) throws NacosException;
    
    // 发布配置,附带md5值
    boolean publishConfigCas(String dataId, String group, String content, String casMd5, String type)
            throws NacosException;
    
    // 删除配置
    boolean removeConfig(String dataId, String group) throws NacosException;
    
    // 删除监听
    void removeListener(String dataId, String group, Listener listener);
    
    // 获取服务端状态
    String getServerStatus();
    
    // 停机
    void shutDown() throws NacosException;
}

2.2.1 configService.getConfig(String dataId, String group, long timeoutMs)

configService.getConfig(String dataId, String group, long timeoutMs) 用于获取配置,附带请求超时时间。

public class NacosConfigService implements ConfigService {
	@Override
    public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
    }
	
	private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
		// 如果group为空,设置为默认分组 DEFAULT_GROUP
        group = null2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);
        ConfigResponse cr = new ConfigResponse();

        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);
        
        // use local config first
       	// 首先从获取本地配置:读取本地配置文件
        String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
                dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }
        
        // 本地缓存不存在
        try {
        	// 从nacos服务端获取配置
            content = worker.getServerConfig(dataId, group, tenant, timeoutMs);

            cr.setContent(content);

			// 过滤
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();

            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                agent.getName(), dataId, group, tenant, ioe.toString());
        }
        
        LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
            dataId, group, tenant, ContentUtils.truncateContent(content));
        // 本地文件、远程服务都获取不到,读取快照文件配置
        content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
        cr.setContent(content);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
}

从本地文件获取配置

String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
public class LocalConfigInfoProcessor {

	public static String getFailover(String serverName, String dataId, String group, String tenant) {
		// 获取本地配置文件:./user.home/nacos/config/serverName_nacos
        File localPath = getFailoverFile(serverName, dataId, group, tenant);

        if (!localPath.exists() || !localPath.isFile()) {
            return null;
        }
        
        try {
        	// 读取文件内容
            return readFile(localPath);
        } catch (IOException ioe) {
            LOGGER.error("[" + serverName + "] get failover error, " + localPath, ioe);
            return null;
        }
    }
    
    // 获取故障转移文件
	static File getFailoverFile(String serverName, String dataId, String group, String tenant) {
        File tmp = new File(LOCAL_SNAPSHOT_PATH, serverName + SUFFIX);
        tmp = new File(tmp, FAILOVER_FILE_CHILD_1);
        if (StringUtils.isBlank(tenant)) {
            tmp = new File(tmp, FAILOVER_FILE_CHILD_2);
        } else {
            tmp = new File(tmp, FAILOVER_FILE_CHILD_3);
            tmp = new File(tmp, tenant);
        }
        return new File(new File(tmp, group), dataId);
    }
}

从Nacos服务器获取配置

ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);

调用 ClientWorker 中的 getServerConfig 方法。

public class ClientWorker implements Closeable {
	
	public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
        throws NacosException {
        if (StringUtils.isBlank(group)) {
        	// 使用默认分组
            group = Constants.DEFAULT_GROUP;
        }

        HttpResult result = null;
        try {
            List<String> params = null;
            if (StringUtils.isBlank(tenant)) {
                params = Arrays.asList("dataId", dataId, "group", group);
            } else {
                params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
            }

			// 发送Get请求:/v1/cs/configs
            result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
        } catch (IOException e) {
            String message = String.format(
                "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(),
                dataId, group, tenant);
            LOGGER.error(message, e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }

        switch (result.code) {
            case HttpURLConnection.HTTP_OK:
            	// 更新本地快照信息
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
                return result.content;
            case HttpURLConnection.HTTP_NOT_FOUND:
            	// 更新本地快照信息
                LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
                return null;
            case HttpURLConnection.HTTP_CONFLICT: {
                LOGGER.error(
                    "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                        + "tenant={}", agent.getName(), dataId, group, tenant);
                throw new NacosException(NacosException.CONFLICT,
                    "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
            }
            case HttpURLConnection.HTTP_FORBIDDEN: {
                LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId,
                    group, tenant);
                throw new NacosException(result.code, result.content);
            }
            default: {
                LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId,
                    group, tenant, result.code);
                throw new NacosException(result.code,
                    "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
            }
        }
    }
}

2.2.2 configService.addListener(String dataId, String group, Listener listener)

configService.addListener(String dataId, String group, Listener listener) 用于给当前dataId、group的配置,添加一个事件监听,用于配置发生变更后,进行通知处理。

public class NacosConfigService implements ConfigService {
	@Override
    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }
}

NacosConfigService 中的 addListener(String dataId, String group, Listener listener) 最终会调用 ClientWorker 中的 addTenantListeners(String dataId, String group, List<? extends Listener> listeners) 方法

public class ClientWorker {
	public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
		// 如果group为空,设置默认分组DEFAULT_GROUP
        group = null2defaultGroup(group);
        
        String tenant = agent.getTenant();
		
		// 如果不存在则添加缓存数据
        CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
        
        for (Listener listener : listeners) {
        	// 添加监听
            cache.addListener(listener);
        }
    }

	// 如果不存在则添加缓存数据
	public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
		// 获取缓存
        CacheData cache = getCache(dataId, group, tenant);
        if (null != cache) {
        	// 不为空,直接return
            return cache;
        }
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        
        synchronized (cacheMap) {// 加锁,线程安全性
            CacheData cacheFromMap = getCache(dataId, group, tenant);
            // multiple listeners on the same dataid+group and race condition,so
            // double check again
            // other listener thread beat me to set to cacheMap
            // 双重检查机制
            if (null != cacheFromMap) {
                cache = cacheFromMap;
                // reset so that server not hang this check
                cache.setInitializing(true);
            } else {
            	// 不存在,创建缓存对象
                cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
                // fix issue # 1317
                // 是否开启远程同步
                if (enableRemoteSyncConfig) {
                	// 获取配置
                    String content = getServerConfig(dataId, group, tenant, 3000L);
                    cache.setContent(content);
                }
            }

			// 设置缓存
            Map<String, CacheData> copy = new HashMap<String, CacheData>(cacheMap.get());
            copy.put(key, cache);
            cacheMap.set(copy);
        }
        LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);

        MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());

        return cache;
    }
}

2.3 长轮询机制

在探讨Nacos长轮询机制前,先给大家普及一下几个概念:

  • 短轮询:指客户端每隔一段时间向服务器发起一次Http请求,服务端收到请求后,进行处理,然后返回给客户端。
  • 长轮询:指客户端向服务端发起一个带**超时时间(timeout)**的Http请求,并在Http连接超时前,不主动断开连接,需要服务端主动回写数据,否则将一直重复以上过程。

Nacos就是利用了长轮询机制,客户端会开启一个线程,不断向服务端发起一个配置是否存在变更的请求(30s超时),服务端收到请求后,如果配置不存在变更,并不会立即返回,而是当配置发生变更后,主动是否将消息回写给客户端。

客户端会存在两种情况:

  1. 请求超时:无配置变更,开启下一次轮询请求
  2. 服务器返回数据:解析服务器返回数据,通过Nameserver、dataId、group重新回去服务器配置,更新本地缓存,触发事件监听,开启下一次轮询请求。

Nacos长轮询原理,分为了客户端 和 服务端,核心代码如下:

2.3 客户端 LongPollingRunnable

LongPollingRunnable 为 ClientWorker 中的一个内部类,代码如下:

public class ClientWorker {
	
	// 检查配置信息:分批处理,一次最多3000
	public void checkConfigInfo() {
        // 分任务
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                // i 为当前批次,用于筛选过滤出属于当前批次的cacheData
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
    
	class LongPollingRunnable implements Runnable {
		// 当前批次id,用于筛选过滤出属于当前批次的cacheData
        private int taskId;

        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {

            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
               	// 获取属于当前批次的cacheData
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                        	// 检查本地配置
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) { // 使用本地配置信息
                            	// 检查cacheData和内存缓存文件是否不一致,如果不一致,通知所有Listener
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }

                // check server config
                // 长轮询:将当前批次的所有cacheData通过Http请求发送给服务端,并附带30s超时时间
                // 1.服务端数据无变化,请求超时,changedGroupKeys = Collections.emptyList()
                // 2.服务端数据存在变更,循环遍历,通过getServerConfig获取并更新本地缓存,触发事件监听
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);

				// 遍历发送变更的groupKey 
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                    	// 重新获取服务端配置,本更新本地配置文件缓存内容
                        String content = getServerConfig(dataId, group, tenant, 3000L);
						// 更新本地内存配置
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(content));
                    } catch (NacosException ioe) {
                        String message = String.format(
                            "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                            agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                
                // 遍历cacheDatas,判断是否需要重新初始化本地文件缓存
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        // 检查cacheData和内存缓存文件是否不一致,如果不一致,通知所有Listener
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();

                executorService.execute(this);

            } catch (Throwable e) {

                // If the rotation training task is abnormal, the next execution time of the task will be punished
                LOGGER.error("longPolling error : ", e);
                // 如果发生异常,延迟taskPenaltyTime后执行当前任务
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }

	/**
     * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
     */
    List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
        StringBuilder sb = new StringBuilder();
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isUseLocalConfigInfo()) {
                sb.append(cacheData.dataId).append(WORD_SEPARATOR);
                sb.append(cacheData.group).append(WORD_SEPARATOR);
                if (StringUtils.isBlank(cacheData.tenant)) {
                    sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
                } else {
                    sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                    sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
                }
                if (cacheData.isInitializing()) {
                    // cacheData 首次出现在cacheMap中&首次check更新
                    inInitializingCacheList
                        .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
                }
            }
        }
        boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
        // 检查更新配置字符串
        return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
    }
    
	/**
     * 从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。
     */
    List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {

        List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);

        List<String> headers = new ArrayList<String>(2);
        headers.add("Long-Pulling-Timeout");
        // 设置超时时间,默认30s
        headers.add("" + timeout);

        // told server do not hang me up if new initializing cacheData added in
        // 是否初始化缓存列表
        if (isInitializingCacheList) {
            headers.add("Long-Pulling-Timeout-No-Hangup");
            headers.add("true");
        }

		// 为空,直接return
        if (StringUtils.isBlank(probeUpdateString)) {
            return Collections.emptyList();
        }

        try {
        	// 发送带超时时间的Http请求,请求路径:/v1/cs/configs/listener
            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                agent.getEncode(), timeout);

            if (HttpURLConnection.HTTP_OK == result.code) {
                setHealthServer(true);
                // 解析更新数据 ID 响应
                return parseUpdateDataIdResponse(result.content);
            } else {
                setHealthServer(false);
                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
            }
        } catch (IOException e) {
            setHealthServer(false);
            LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
            throw e;
        }
        // 超时返回 Collections.emptyList()
        return Collections.emptyList();
    }
    
    // 检查本地配置
	private void checkLocalConfig(CacheData cacheData) {
        final String dataId = cacheData.dataId;
        final String group = cacheData.group;
        final String tenant = cacheData.tenant;
        File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);

        // 没有 -> 有
        if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);

            LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
            return;
        }

        // 有 -> 没有。不通知业务监听器,从server拿到配置后通知。
        if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
            cacheData.setUseLocalConfigInfo(false);
            LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
            return;
        }

        // 有变更
        if (cacheData.isUseLocalConfigInfo() && path.exists()
            && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        }
    }
}

2.4 服务端

由上面客户端代码分析可知,客户端会发送一个 /v1/cs/configs/listener 的请求。

// 发送带超时时间的Http请求,请求路径:/v1/cs/configs/listener
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
    agent.getEncode(), timeout);

服务端收到请求后,处理如下:

@Controller
// Constants.CONFIG_CONTROLLER_PATH = /v1/cs/configs
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {

    /**
     * 比较MD5
     */
	@RequestMapping(value = "/listener", method = RequestMethod.POST)
    public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        // 获取需要比较的字符串
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }

		// 解码
        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);

        // key -> groupKey  value -> md5
        Map<String, String> clientMd5Map;
        try {
        	// 获取客户端传输过来的md5值
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }

        // do long-polling
        // 长轮询
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }
}

2.4.1 ConfigServletInner.doPollingConfig

@Service
public class ConfigServletInner {

	/**
     * 轮询接口
     */
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                                  Map<String, String> clientMd5Map, int probeRequestSize)
        throws IOException, ServletException {

        // 长轮询
        if (LongPollingService.isSupportLongPolling(request)) {
        	// 添加长轮询客户端
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }

        // else 兼容短轮询逻辑
        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);

        // 兼容短轮询result
        String oldResult = MD5Util.compareMd5OldResult(changedGroups);
        String newResult = MD5Util.compareMd5ResultString(changedGroups);

        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
        if (version == null) {
            version = "2.0.0";
        }
        int versionNum = Protocol.getVersionNumber(version);

        /**
         * 2.0.4版本以前, 返回值放入header中
         */
        if (versionNum < START_LONGPOLLING_VERSION_NUM) {
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
        } else {
            request.setAttribute("content", newResult);
        }

        // 禁用缓存
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        return HttpServletResponse.SC_OK + "";
    }
}

2.4.2 LongPollingService.addLongPollingClient

@Service
public class LongPollingService extends AbstractEventListener {
	
	public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {
		// 获取客户端超时时间
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        // 挂断标志
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        // 应用名称
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        
        // 延迟时间,服务端处理时间。0.5s
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        
        /**
         * 提前500ms返回响应,为避免客户端超时
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
        	// 先检查是否存在变更,如果存在,直接返回
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }

		/*************************不存在变更,采用servlet 3.0 异步处理***************************/
        // 客户端ip
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP线程调用,否则离开后容器会立即发送响应
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
        asyncContext.setTimeout(0L);

		// 开启定时任务
		// 其中,timeout = Math.max(10000, Long.parseLong(str) - delayTime) = Math.max(10000, 30000 - 500) = 29.5s
        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }
}

2.4.3 ClientLongPolling

ClientLongPolling 为 LongPollingService 的内部类,代码如下:

@Service
public class LongPollingService extends AbstractEventListener {
	class ClientLongPolling implements Runnable {

        @Override
        public void run() {
        	// 开启一个延时线程,timeoutTime=29.5s
            asyncTimeoutFuture = scheduler.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                    	// 获取并设置客户端IP
                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                        /**
                         * 删除订阅关系
                         */
                        allSubs.remove(ClientLongPolling.this);

						// 是否固定轮询
                        if (isFixedPolling()) {
                            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                                (System.currentTimeMillis() - createTime),
                                "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                                "polling",
                                clientMd5Map.size(), probeRequestSize);

							// 通过md5值,获取当前所有变更的groups
                            List<String> changedGroups = MD5Util.compareMd5(
                                (HttpServletRequest)asyncContext.getRequest(),
                                (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                            if (changedGroups.size() > 0) {
                            	// 发送数据
                                sendResponse(changedGroups);
                            } else {
                            	// 发送数据
                                sendResponse(null);
                            }
                        } else {
                            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}",
                                (System.currentTimeMillis() - createTime),
                                "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()),
                                "polling",
                                clientMd5Map.size(), probeRequestSize);
                            // 发送数据
                            sendResponse(null);
                        }
                    } catch (Throwable t) {
                        LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause());
                    }

                }

            }, timeoutTime, TimeUnit.MILLISECONDS);

			// 添加订阅关系
            allSubs.add(this);
        }

		void sendResponse(List<String> changedGroups) {
            /**
             *  取消超时任务
             */
            if (null != asyncTimeoutFuture) {
                asyncTimeoutFuture.cancel(false);
            }
            generateResponse(changedGroups);
        }
		
		void generateResponse(List<String> changedGroups) {
            if (null == changedGroups) {
                /**
                 * 告诉容器发送HTTP响应
                 */
                asyncContext.complete();
                return;
            }

            HttpServletResponse response = (HttpServletResponse)asyncContext.getResponse();

            try {
            	// 获取resp 
                String respString = MD5Util.compareMd5ResultString(changedGroups);

                // 禁用缓存
                response.setHeader("Pragma", "no-cache");
                response.setDateHeader("Expires", 0);
                response.setHeader("Cache-Control", "no-cache,no-store");
                response.setStatus(HttpServletResponse.SC_OK);
                // 回写数据
                response.getWriter().println(respString);
                asyncContext.complete();
            } catch (Exception se) {
                pullLog.error(se.toString(), se);
                asyncContext.complete();
            }
        }
    }
    
 }

注意:如果在29.5s内发生变化,那么nacos是怎么处理的呢?

假如,在这29.5s内,你进入了nacos控制台,修改配置内容后,保存发布,那这个时候,nacos服务端会做哪些内容呢??

2.4.4 LocalDataChangeEvent

通过浏览器控制台可发现,当你点击保存后,会调用nacos服务端的 /v1/cs/configs/ 请求,最后发送一个 LocalDataChangeEvent 事件。

@Service
public class LongPollingService extends AbstractEventListener {
    /**
     * 长轮询订阅关系
     */
    final Queue<ClientLongPolling> allSubs;
    
	@Override
    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            if (event instanceof LocalDataChangeEvent) {
            	// 接收 LocalDataChangeEvent  
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                // 执行 DataChangeTask
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }

	class DataChangeTask implements Runnable {
        @Override
        public void run() {
            try {
                ConfigService.getContentBetaMd5(groupKey);
                // 循环遍历 allSubs Queue<ClientLongPolling> allSubs;
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
					
					// 如果当前 ClientLongPolling  中的 clientMd5Map key中存在当前 groupKey,则进行通知
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // 如果beta发布且不在beta列表直接跳过
                        if (isBeta && !betaIps.contains(clientSub.ip)) {
                            continue;
                        }

                        // 如果tag发布且不在tag列表直接跳过
                        if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                            continue;
                        }

                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        
                        // 删除订阅关系
                        iter.remove();
                        
                        LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                            (System.currentTimeMillis() - changeTime),
                            "in-advance",
                            RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
                            "polling",
                            clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                            
                        // 发送服务配置变更groupKey,完成实时通知
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
                LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
            }
        }

        DataChangeTask(String groupKey) {
            this(groupKey, false, null);
        }

        DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {
            this(groupKey, isBeta, betaIps, null);
        }

        DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) {
            this.groupKey = groupKey;
            this.isBeta = isBeta;
            this.betaIps = betaIps;
            this.tag = tag;
        }

        final String groupKey;
        final long changeTime = System.currentTimeMillis();
        final boolean isBeta;
        final List<String> betaIps;
        final String tag;
    }
}

至此,Nacos 配置中心原理分析完成,下面我们回顾一下整体流程。

三、总结

Nacos-配置中心原理解析

版权声明:程序员胖胖胖虎阿 发表于 2022年11月8日 下午8:24。
转载请注明:Nacos-配置中心原理解析 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...