如何设计分布式系统开关
背景
在分布式系统中为什么要使用开关?例如双十一电商平台需要做促销活动,此时订单量暴增,在下单环节,可能需要调用A、B、C三个接口来完成,但是其实A和B是必须的,C只是附加的功能(例如在下单的时候获取用户常用地址,或者发个推送消息之类的),可有可无,在平时系统没有压力,在容量充足的情况下,调用下没问题,但是在特殊节日的大促环节,系统已经满负荷了,这时候其实完全可以不去调用C接口,怎么实现这个呢?改代码重新发布?no,这样不太敏捷,于是开关诞生了,开发人员只要简单执行一下命令或者点一下页面,就可以关掉对于C接口的调用,在请求高峰过去之后,再把开关恢复回去即可。类似的使用场景还有A/B Test、灰度发布和数据的不停服切换等。
整体设计
高可用分布式开关设计?
一、 需求分析
开关服务的核心需求主要有以下几点:
1. 支持开关的分布式化管理
• 开关统一管理,发布、更新等操作只需在中心服务器上进行,一次操作,处处可用。
• 开关更新自动化,当开关发生变更,订阅该开关的客户端会自动发现变更,进而同步新值。
2. 具有容灾机制,保证服务的高可用
• 服务集群的高可用,当集群中的一台server不可用了,client发现后可以自动切换到其他server上进行访问。
• 客户端具备容灾机制,当开关中心完全不可用,可以在客户端对开关进行操作。
二、方案设计
针对此需求分析,我们抽象成了两大块分别是配置中心和SDK,整体架构如下:
各个系统模块介绍:
配置中心:
配置中心在此处提供开关的统一管理
zookeeper:
分布式开关统一注册中心,主要提供变更通知服务,客户端通过订阅开关节点,实时获取开关变更信息,从而同步更新到本地缓存
SDK:
client端获取开关,以及监听开关的变化从而更新本地缓存。
配置中心设计
配置中心在此处的作用有如下几点:
提供开关统一管理,包含开关发布、更新、查询等基本服务
操作开关的日志,比如谁在某一时刻将开关从关闭状态修改为打开状态。
系统权限控制,只有拥有相关权限的人才能操作开关。
配置中心整体设计如下:
其中db主要保存了开关信息,以及日志信息。
zk主要用来创建开关和监听开关的变化。
配置中心的设计看似简单,其实也需要注意以下几点:
1. 开关的命名重复问题
在设计系统的时候,开关是否要共享给所有系统,还是其中某一个系统,如果共享给所有系统,那么有权限的人对开关命名的时候难免会重复,针对此,我们设计了appid的概念,一个系统对应一个appid,在一个appid内开关名称不允许有重复,只有该appid的owner才有权限对该appid的下的开关做操作。
2. 开关的分类
我们可以将开关分为三大类,分别是功能开关、降级开关、灰度开关:
功能开关
针对某一个功能是否打开,例如在订单下单的时候需要获取下单用户的历史换绑手机号信息,但是由于B系统只是提供了接口定义,实际业务还未开发完成,A系统可以先提前开发并上线,待B系统上线之后,A系统将该功能开关打开。
降级开关
典型的应用场景是电商做促销的时候,比如双十一电商做促销,用户下单的时候获取用户历史常用地址,因为双十一系统已经达到负荷,为了系统性能,将该业务逻辑降级。或者A系统调用B系统,由于B系统整体宕机,为了不影响A系统继续运行,可以手动将B系统降级等。
灰度开关
针对某一功能做灰度,例如我们需要针对刷单用户在下单过程中做拦截,为此我们在下单阶段做了一套黑白名单处理,但是我们也无法知晓该套黑白名单的正确率多少,为了避免造成误拦,我们需要对该功能做灰度采样,以便及时调整我们的黑白名单逻辑。通常的灰度策略为 1% 灰度,10%灰度,30%灰度,50%。。。
3. zk中开关设计
zk中的设计结构为路径格式,我们将/appid 设置为根路径,例如appid为order的根路径为/order,则在该appid下设置的user_open开关的路径则为:
/order/user_open ,所以我们设计的路径公式如下:
/appid/switch
部分页面效果如下:
SDK设计
SDK主要是以jar的形式嵌入在client端的,它的作用主要是在client端获取开关,以及监听开关的变化从而更新本地缓存。SDK整体设计如下所示:
我们使用Curator来操作zk,因为它相比原生的zk 客户端确实好用不少,这里不做过多展开,为了提高系统性能我们将开关信息缓存在本地内存,这样做的目的是提升系统的性能,所以获取开关的流程图如下:
1. 监听开关变化
如果开关发生改变,我们需要将开关变化的信息载入到本地,监听代码如下:
1 private void nodeListener(final String key) {
2 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key);
3 try {
4 nodeCache.start();
5 nodeCache.getListenable().addListener(new NodeCacheListener() {
6
7 public void nodeChanged() throws Exception {
8 String msg = new String(nodeCache.getCurrentData().getData());
9 System.out.println("监听事件触发");
10 System.out.println("重新获得节点内容为:" + msg);
11 //加入到本地缓存
12 dataMap.put(key, msg);
13 }
14 });
15 } catch (Exception e) {
16 e.printStackTrace();
17 }
18
19
20 }
2. 降级开关
降级开关和功能开关在底层实现上是一样的,就是从zk获取value为true的时候,是打开状态的,代码如下:
1 /**
2 * 获取开关,默认是打开的
3 *
4 * @param switchKey
5 * @return
6 */
7 public boolean getSwitch(String switchKey) {
8 try {
9 String dataMsg = getDataMsg(switchKey);
10 if (isEmpty(dataMsg)) {
11 return true;
12 }
13
14 return Boolean.parseBoolean(dataMsg);
15 } catch (Exception e) {
16 e.printStackTrace();
17 }
18
19 return true;
20 }
其中getDataMsg方法封装了本地缓存的调用,具体代码如下:
1 private String getDataMsg(String key) {
2 byte[] data;
3 try {
4 //先从本地缓存中找
5 String msg = dataMap.get(key);
6 if (!isEmpty(msg)) {
7 return msg;
8 }
9
10 //本地缓存没有,则从zk中去查找
11 data = dataBuilder.forPath(basePath + "/" + key);
12 if (data != null) {
13 String dataMsg = new String(data);
14 //重新塞入缓存
15 dataMap.put(key, dataMsg);
16 nodeListener(key);
17 return dataMsg;
18 }
19 } catch (Exception e) {
20 e.printStackTrace();
21 }
22
23 return null;
24 }
降级开关和功能开关的代码完成后,接下来是一段测试demo,测试开关是否可以正常使用,代码如下:
1public class SwitchDemo {
2
3 public static void main(String[] args)throws Exception {
4
5 //user_open 开关 打开
6 if(SwitchHandler.config().getSwitch("user_open")){
7 System.out.println("exe user open switch 1");
8 }
9
10 Thread.sleep(10000);
11
12
13 //user_open 开关 关闭
14 if (SwitchHandler.config().getSwitch("user_open")){
15 System.out.println("exe user open switch 2");
16 }else{
17
18 System.out.println("exe user not open");
19 }
20
21
22 Thread.sleep(1000000000);
23 }
24}
接下来在本地启动一个zk单机服务,进入到zk的安装目录 ,启动命令如下:
1./zkServer.sh start
启动一个客户端,创建一个开关user_open value为true,假设我这个服务的appid叫sky,那么我应该先创建/sky 这个路径,接着创建,/sky/user_open这个路径,命令如下:
1create /sky 1
2create /sky/user_open true
接下来我们启动SwitchDemo测试类,在代码走到第一次sleep阶段,我们立马将user_open 这个值修改为false,修改zk的命令为:
1set /sky/user_open false
最终打印结果如下:
从结果可以看出,第一次执行的时候,由于user_open的value为true,所以
日志 exe user open switch 1 打印出来了,其次监听的日志也打印出来了,当代码执行到第十行的时候,我们将user_open的value修改为false,此时监听的日志监听到开关发生了变化,并将本地内存的开关地址修改了false,最后执行第14行代码的时候,由于开关是关闭状态,所以走到了第18行的逻辑。
3. 灰度开关设计
灰度开关主要针对某一个功能来进行灰度,那么就需要有一个灰度策略的概念,比如设置的是灰度10%,此时有1000个请求进来,应该只有100个左右的请求是命中这段逻辑,在微服务架构中,服务与服务之间的调用都会透传一个requestId(请求id),因此将requestId 当做灰度的主体是最适合不过了,简单的灰度算法可以将requestId 进行hash 取模100 然后跟设置的灰度值进行比较即可。代码如下:
1 /**
2 * 灰度开关
3 *
4 * @param switchKey
5 * @param strategyId 灰度策略,可以传入requestId,手机号来进行灰度
6 * @return
7 */
8 public boolean getGrayscaleSwitch(String switchKey, String strategyId) {
9
10 int value = getInt(switchKey, 100);
11
12 int hash = strategyId.hashCode();
13
14 return Math.abs(hash) % 100 <= value;
15 }
灰度不是一个精确值,请求量越大灰度的越精确,因此接下来我们的测试demo,会模拟10000条请求,如果命中了大约1000条左右,那么说明我们的灰度算法没啥问题,我们将当前时间戳当做requestId(当然实际不要这么做,应该用微服务之间透传的requestId,这里只是为了测试)
首先设置一个灰度开关user_gary,value为10(代表灰度10%,最大为100)
1create /sky/user_gary 10
测试代码如下:
1 //灰度10%开关
2 int grayCount=0;//
3 for (int i=0;i<10000;i++){
4 String requestId = System.currentTimeMillis()+"-"+i;
5 if (SwitchHandler.config().getGrayscaleSwitch("user_gary",requestId)){
6 grayCount++;
7 }
8 }
9
10 System.out.println("进入灰度开关的次数为:"+grayCount);
运行结果:
我们看到10000次请求,命中了1176次,大约灰度10%,说明灰度起作用了。
SDK完整代码
pom依赖:
1 <dependency>
2 <groupId>org.apache.curator</groupId>
3 <artifactId>curator-recipes</artifactId>
4 <version>4.0.1</version>
5 </dependency>
SDK完整代码:
1package com.wuzy.myswitch;
2
3import org.apache.curator.RetryPolicy;
4import org.apache.curator.framework.CuratorFramework;
5import org.apache.curator.framework.CuratorFrameworkFactory;
6import org.apache.curator.framework.api.GetDataBuilder;
7import org.apache.curator.framework.recipes.cache.NodeCache;
8import org.apache.curator.framework.recipes.cache.NodeCacheListener;
9import org.apache.curator.retry.ExponentialBackoffRetry;
10
11import java.util.Map;
12import java.util.concurrent.ConcurrentHashMap;
13
14public class SwitchHandler {
15
16 private String basePath = "/sky";
17 private GetDataBuilder dataBuilder;
18
19 private Map<String, String> dataMap = new ConcurrentHashMap<String, String>();
20
21 private SwitchHandler() {
22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
23 client = CuratorFrameworkFactory.builder()
24 .connectString("127.0.0.1:2181")
25 .retryPolicy(retryPolicy)
26 .sessionTimeoutMs(6000)
27 .connectionTimeoutMs(3000)
28 .build();
29 client.start();
30
31 dataBuilder = client.getData();
32
33 //TODO 后期这里最好将数据库中的开关加载出来,从zk中找出放入本地缓存中,加快查询速度
34 }
35
36 private static class SwitchHandlerHolder {
37 private static final SwitchHandler switchHandler = new SwitchHandler();
38 }
39
40 public static SwitchHandler config() {
41 return SwitchHandlerHolder.switchHandler;
42 }
43
44 private CuratorFramework client;
45
46
47 /**
48 * 获取开关,默认是打开的
49 *
50 * @param switchKey
51 * @return
52 */
53 public boolean getSwitch(String switchKey) {
54 try {
55 String dataMsg = getDataMsg(switchKey);
56 if (isEmpty(dataMsg)) {
57 return true;
58 }
59
60 return Boolean.parseBoolean(dataMsg);
61 } catch (Exception e) {
62 e.printStackTrace();
63 }
64
65 return true;
66 }
67
68 /**
69 * 灰度开关
70 *
71 * @param switchKey
72 * @param strategyId 灰度策略,可以传入requestId,手机号来进行灰度
73 * @return
74 */
75 public boolean getGrayscaleSwitch(String switchKey, String strategyId) {
76
77 int value = getInt(switchKey, 100);
78
79 int hash = strategyId.hashCode();
80
81 return Math.abs(hash) % 100 <= value;
82 }
83
84
85 public int getInt(String key, int defaultValue) {
86 try {
87 String dataMsg = getDataMsg(key);
88 if (isEmpty(dataMsg)) {
89 return defaultValue;
90 }
91 return Integer.parseInt(dataMsg);
92
93 } catch (Exception e) {
94 e.printStackTrace();
95 }
96
97 return defaultValue;
98 }
99
100
101 private boolean isEmpty(String msg) {
102 return msg == null || msg.trim().equals("");
103 }
104
105 private String getDataMsg(String key) {
106 byte[] data;
107 try {
108 //先从本地缓存中找
109 String msg = dataMap.get(key);
110 if (!isEmpty(msg)) {
111 return msg;
112 }
113
114 //本地缓存没有,则从zk中去查找
115 data = dataBuilder.forPath(basePath + "/" + key);
116 if (data != null) {
117 String dataMsg = new String(data);
118 //重新塞入缓存
119 dataMap.put(key, dataMsg);
120 nodeListener(key);
121 return dataMsg;
122 }
123 } catch (Exception e) {
124 e.printStackTrace();
125 }
126
127 return null;
128 }
129
130
131 private void nodeListener(final String key) {
132 final NodeCache nodeCache = new NodeCache(client, basePath + "/" + key);
133 try {
134 nodeCache.start();
135 nodeCache.getListenable().addListener(new NodeCacheListener() {
136
137 public void nodeChanged() throws Exception {
138 String msg = new String(nodeCache.getCurrentData().getData());
139 System.out.println("监听事件触发");
140 System.out.println("重新获得节点内容为:" + msg);
141 //加入到本地缓存
142 dataMap.put(key, msg);
143 }
144 });
145 } catch (Exception e) {
146 e.printStackTrace();
147 }
148
149
150 }
151}
如果你想学好JAVA这门技术,也想在IT行业拿高薪,可以参加我们的训练营课程,选择最适合自己的课程学习,技术大牛亲授,8个月后,进入名企拿高薪。我们的课程内容有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点。如果你想拿高薪的,想学习的,想就业前景好的,想跟别人竞争能取得优势的,想进阿里面试但担心面试不过的,你都可以来,q群号为:180705916 进群免费领取学习资料。