配置中心Apollo

Apollo(3) Portal、Config、Admin Se

2022-12-09  本文已影响0人  Oliver_Li
一、数据库表:
  1. portal DB表:
  1. config db表:
二、一些细节:
  1. cluster不创建就会用default表示,界面上不会显示,但并不是没有
  2. 每创建一个app默认会创建一个叫application的namespace,对应spring项目的application.properties
  3. openApi可以通过在portal界面注册三方用户,用HTTP + Token的方式直接调用portal相关接口,不用登陆portal手动点击配置发布等等操作
  4. namespace公有、私有、关联
  1. 关联公有namespace实际是创建了一条新的namespace给相应app
  2. apollo通过appId + clusterName + namespaceName定位一个具体namespace,类似于maven的groupId + artifactId,源码里也叫watchKey
三、修改、发布:
  1. 配置修改时,item表记录配置项,commit表增加一条修改记录,到这里修改部分就结束了,主要看发布
  2. 配置发布概述,portal向admin service请求是同步,admin service到config service是异步,为了不引入额外组件,admin service写入数据库,config service轮询数据库推送消息的方式代替了消息队列,通过最新数据的ID作为版本号确认是否有配置更新,就是releaseMessage表
  3. 发布时item和commit表没变化,release、releaseHistory和releaseMessage会增加一条记录
  1. 发送消息时,releaseMessage的ID会存到一个BlockingQueue里,config service会每5s取一次BlockingQueue的数据取完为止,因为只记录定位,同一namespace多次发布releaseMessage表里会有重复数据,需要定时清理,只保留最新一条数据
  2. config service每秒扫描是否有更新,更新时取出遍历所有listener推送Client,这个监听器里触发更新后的一些处理,包括清缓存、通知Client等等,最后更新缓存最大ID记录
  @Override
  public void afterPropertiesSet() throws Exception {
    databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
    maxIdScanned = loadLargestMessageId();
    executorService.scheduleWithFixedDelay(() -> {
      Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
      try {
        scanMissingMessages();
        scanMessages();
        transaction.setStatus(Transaction.SUCCESS);
      } catch (Throwable ex) {
        transaction.setStatus(ex);
        logger.error("Scan and send message failed", ex);
      } finally {
        transaction.complete();
      }
    }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

  }

private boolean scanAndSendMessages() {
    // 一次扫500,多了下次再扫
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
      return false;
    }
    // 遍历所有注册上来的监听器,通知client
    fireMessageScanned(releaseMessages);
    int messageScanned = releaseMessages.size();
    long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
    // check id gaps, possible reasons are release message not committed yet or already rolled back
    if (newMaxIdScanned - maxIdScanned > messageScanned) {
      recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
    }
    maxIdScanned = newMaxIdScanned;
    return messageScanned == 500;
  }

/**
   * 遍历所有listener推送message
   * @param messages
   */
  private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
    for (ReleaseMessage message : messages) {
      for (ReleaseMessageListener listener : listeners) {
        try {
          listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
        } catch (Throwable ex) {
          Tracer.logError(ex);
          logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
        }
      }
    }
  }
    @Bean
    public ReleaseMessageScanner releaseMessageScanner() {
      ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
      //0\. handle release message cache
      releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
      //1\. handle gray release rule
      releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
      //2\. handle server cache
      releaseMessageScanner.addMessageListener(configService);
      releaseMessageScanner.addMessageListener(configFileController);
      //3\. notify clients
      releaseMessageScanner.addMessageListener(notificationControllerV2);
      releaseMessageScanner.addMessageListener(notificationController);
      return releaseMessageScanner;
    }
  1. 关联Client章节关于RemoteConfigLongPollService的描述,RemoteConfigLongPollService长轮训/notifications/v2这个接口,触发更新时返回http200,超时返回http304,NotificationControllerV2这个对象触发DeferredResult响应的逻辑就包括在上面提到的监听回调中
  2. NotificationControllerV2里面两个主要方法:
  1. pollNotification:
四、服务注册、发现:
  1. 回顾:Meta Server = config service + meta service + eureka在同一进程
  2. 启动eureka:
  1. 注册到eureka:
  1. 发现:
四、ServerConfig基础配置:
  1. RefreshablePropertySource(抽象):继承MapPropertySource,提供抽象refresh(),能看出它是一个能刷新的kv数据源
  2. PortalDBPropertySource、BizDBPropertySource继承RefreshablePropertySource,refresh()时会从数据库抓取所有配置,并放入PropertySource,分别给portal和admin、config service使用
  3. RefreshableConfig(抽象):上面是两个代表数据源,这个是代表具体数据配置,对应也有两个实现,这里每60s会刷新数据源
@PostConstruct
  public void setup() {
    // 获取数据源,因为要区分portal和admin config,所以在实现类提供
    propertySources = getRefreshablePropertySources();
    if (CollectionUtils.isEmpty(propertySources)) {
      throw new IllegalStateException("Property sources can not be empty.");
    }

    // 刷新数据源并置入environment
    for (RefreshablePropertySource propertySource : propertySources) {
      propertySource.refresh();
      environment.getPropertySources().addLast(propertySource);
    }

    // 定时刷新
    ScheduledExecutorService
        executorService =
        Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ConfigRefresher", true));

    executorService
        .scheduleWithFixedDelay(() -> {
          try {
            propertySources.forEach(RefreshablePropertySource::refresh);
          } catch (Throwable t) {
            logger.error("Refresh configs failed.", t);
            Tracer.logError("Refresh configs failed.", t);
          }
        }, CONFIG_REFRESH_INTERVAL, CONFIG_REFRESH_INTERVAL, TimeUnit.SECONDS);
  }
  1. RefreshableConfig对应的两个实现比较简单,根据key获取各种配置
  2. portal service在页面上还提供了对ServerConfig表配置更新的操作,略
上一篇下一篇

猜你喜欢

热点阅读