我爱编程

利用zookeeper实现缓存更新功能

2017-12-29  本文已影响0人  VIPSHOP_FCS
  最近在用了堆外缓存之后,由于存在着缓存不更新的风险,有时候需要对缓存进行处理。该域用的是公司开发服务平台框架,有点类似Tomcat运用。
  费劲脑汁,暂时想到两种方案:
  1. 开发一个专门用于运维的接口,每次需要运维时,指定ip进行(若不指定ip,由于有多台部署机器,请求路由到完全不可知的机器上)

  2. 采用公司分布式配置依赖最多的,功能也相对强大的zookeeper框架。运用zk节点的内容变化时的及时通知机制。

两种方案的比较

优点 缺点
方案一 实施起来简单 没有挑战性,需要知道所有ip,其次ip改变就比较难维护了
方案二 修改节点值,客户端能及时监听到,从而能做出相应的操作,比如在本例中可以进行缓存的更新,而不需要知道客户端的ip 需要有操作zk的操作权限

经过上面的比较,决定采用方法二(假定已申请到zk的操作权限)

动手前准备

1.下载zookeeper-3.3.6,解压后看到

image

2. 进入conf目录下,新建一个zoo.cfg文件,并写入

# The number of milliseconds of each tick  心跳间隔 毫秒每次
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting anacknowledgement
syncLimit=5
# the directory where the snapshot isstored.  //镜像数据位置
dataDir=E:\\develop-tools\\zookeeper-3.3.6\\data
#日志位置
dataLogDir=E:\\develop-tools\\zookeeper-3.3.6\\logs
# the port at which the clients willconnect  客户端连接的端口
clientPort=2181

server.0=127.0.0.1:8880:7770
#server.1=127.0.0.1:8881:7771
#server.2=127.0.0.1:8882:7772

3. 双击bin目录下的 zkServer.cmd,即可在Windows环境下启动一个zk服务端。(zkCli的用法,大家可以自行学习)

开始动手

为了代码优雅,由于ZooKeeper原生客户端的各类操作方法比较繁琐,用户体验不好,下面我们用Curator+Spring实现demo:

1.封装获取zooKeeper客户端连接的工厂类:ZookeeperFactory

package com.vip.fcs.ps.zk.demo;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.util.StringUtils;

/**
 * 获取zookeeper客户端链接工厂类
 */
public class ZookeeperFactory implements FactoryBean<CuratorFramework> {

    private String zkHosts;
    // session超时
    private int sessionTimeout = 30000;
    private int connectionTimeout = 30000;

    // 共享一个zk链接
    private boolean singleton = true;

    // 全局path前缀,常用来区分不同的应用
    private String namespace;

    private final static String ROOT = "vip";

    private CuratorFramework zkClient;

    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public void setSingleton(boolean singleton) {
        this.singleton = singleton;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    @Override
    public CuratorFramework getObject() throws Exception {
        if (singleton) {
            if (zkClient == null) {
                zkClient = create();
                zkClient.start();
            }
            return zkClient;
        }
        return create();
    }

    @Override
    public Class<?> getObjectType() {
        return CuratorFramework.class;
    }

    @Override
    public boolean isSingleton() {
        return singleton;
    }

    public CuratorFramework create() throws Exception {
        if (StringUtils.isEmpty(namespace)) {
            namespace = ROOT;
        } else {
            namespace = ROOT + "/" + namespace;
        }
        return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
    }

    public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout,
            String namespace) {
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
        return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
                .canBeReadOnly(true).namespace(namespace)
                .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)).defaultData(null).build();
    }

    public void close() {
        if (zkClient != null) {
            zkClient.close();
        }
    }
}

2.zookeeper的操作类:ZkHandler

package com.vip.fcs.ps.zk.demo;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.InitializingBean;

/**
 * 结合spring,封装了zk的相关操作
 */
public class ZkHandler implements InitializingBean {
    private CuratorFramework zkClient;
    private String service = "ps";
    private String version = "1.0";

    private TreeCache treeCache;

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        // 如果zk尚未启动,则启动
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }
        buildTreeCache(zkClient, getServicePath());
        // 开始监听
        treeCache.start();

    }

    /**
     * 监听的节点
     * @return
     */
    private String getServicePath() {
        return "/" + service + "/" + version;
    }

    private void buildTreeCache(final CuratorFramework zkClient, String path) {
        // 设置节点的cache
        treeCache = new TreeCache(zkClient, path);
        // 设置监听器和处理过程
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                String log = ("NODE_ADDED : " + data.getPath() + " --->DATA:" + new String(data.getData()));
                if (data != null) {
                    switch (event.getType()) {
                    case NODE_ADDED:
                        System.out.println(log);
                        // do sth
                        break;
                    case NODE_REMOVED:
                        System.out.println(log);
                        // do sth
                        break;
                    case NODE_UPDATED:
                        System.out.println(log);
                        // do sth
                        break;
                    default:
                        break;
                    }
                } else {
                    System.out.println("data is NULL" + event.getType());
                }
            }
        });
    }

    public void createPersistentNode() throws Exception {
        if (zkClient.checkExists().forPath(getServicePath()) == null) {
            zkClient.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
                    .forPath(getServicePath());
        }
    }

    public void changeNodeData(String nodeData) throws Exception {
        if (zkClient.checkExists().forPath(getServicePath()) != null) {
            zkClient.setData().forPath(getServicePath(), nodeData.getBytes());
        }
    }

}
  1. 下面我们再创建3个客户端(2个watcher+一个用来改变节点值的client)

3.1 ZkWatcherOne

package com.vip.fcs.ps.zk.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ZkWatcherOne {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
        while (true) {
            Thread.sleep(2000L);
        }
    }
}

3.2 ZkWatcherTwo

package com.vip.fcs.ps.zk.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ZkWatcherTwo {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
        while (true) {
            Thread.sleep(2000L);
        }
    }
}

3.3 ZkSetClient

package com.vip.fcs.ps.zk.demo;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ZkSetClient {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        ZkHandler zkHandler = (ZkHandler) context.getBean("zkHandler");
        zkHandler.createPersistentNode();
        for (int i = 0; i < 10; i++) {
            zkHandler.changeNodeData("time" + i);
            Thread.sleep(2000L);
        }
    }
}

4.开始运行相关代码

4.1 启动ZkWatcherOne, 控制台打印日志

NODE_ADDED : /ps/1.0 --->DATA:

4.2 启动ZkWatcherTwo, 控制台打印日志

NODE_ADDED : /ps/1.0 --->DATA:

4.3 这时候启动ZkSetClient,在ZkWatcherOne和ZkWatcherTwo的控制台最终打印出如下的日志

image

由此可见,当一个节点值改变时,其他节点都能接收到监听事件,并能够做出相应的操作。在监听事件中,我们可以对本机缓存进行更新,由此达到了我们的目的。
4.4 相关配置文件 applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd"
       default-lazy-init="false">

    <!-- zookeeper -->
    <bean id="zookeeper" class="com.vip.fcs.ps.zk.demo.ZookeeperFactory"
          destroy-method="close">
        <property name="zkHosts"
                  value="127.0.0.1:2181"/>
        <property name="namespace" value="ps.api"/>
        <property name="connectionTimeout" value="3000"/>
        <property name="sessionTimeout" value="3000"/>
        <property name="singleton" value="true"/>
    </bean>

    <!-- zookeeper -->
    <bean id="zkHandler" class="com.vip.fcs.ps.zk.demo.ZkHandler">
        <property name="zkClient"
                  ref="zookeeper"/>
    </bean>
</beans>

4.4 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>

   <groupId>com.fcs.ps.demo</groupId>
   <artifactId>zk-demo</artifactId>
   <version>1.0-SNAPSHOT</version>
   <properties>
       <spring.version>4.0.7.RELEASE</spring.version>
   </properties>
   <dependencies>
       <dependency>
           <groupId>commons-pool</groupId>
           <artifactId>commons-pool</artifactId>
           <version>1.6</version>
       </dependency>
       <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context</artifactId>
           <version>4.0.9.RELEASE</version>
       </dependency>

       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.4.6</version>
       </dependency>
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.4.9</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-framework</artifactId>
           <version>4.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-test</artifactId>
           <version>4.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
           <version>4.0.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-x-discovery</artifactId>
           <version>4.0.0</version>
       </dependency>

   </dependencies>

</project>

拓展 :zookeeper如此强大,我们进一步了解一下其优缺点,已经相关的应用场景。

zookeeper 特点及优点 :

1. zookeeper是一个精简的文件系统。这点它和hadoop有点像,但是zookeeper这个文件系统是管理小文件的,而hadoop是管理超大文件的。

2. zookeeper提供了丰富的“构件”,这些构件可以实现很多协调数据结构和协议的操作。例如:分布式队列、分布式锁以及一组同级节点的“领导者选举”算法。

3. zookeeper是高可用的,它本身的稳定性是相当之好,分布式集群完全可以依赖zookeeper集群的管理,利用zookeeper避免分布式系统的单点故障的问题。

4. zookeeper采用了松耦合的交互模式。这点在zookeeper提供分布式锁上表现最为明显,zookeeper可以被用作一个约会机制,让参入的进程不在了解其他进程的(或网络)的情况下能够彼此发现并进行交互,参入的各方甚至不必同时存在,只要在zookeeper留下一条消息,在该进程结束后,另外一个进程还可以读取这条信息,从而解耦了各个节点之间的关系。

5. zookeeper为集群提供了一个共享存储库,集群可以从这里集中读写共享的信息,避免了每个节点的共享操作编程,减轻了分布式系统的开发难度。

6. zookeeper的设计采用的是观察者的设计模式,zookeeper主要是负责存储和管理大家关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应,从而实现集群中类似 Master/Slave 管理模式。

个人认为,zookeeper最有价值的东西也许是内容变化能够及时通知到监听者,结合上面zookeeper的特点,目前zookeeper的使用场景有:

1.数据发布与订阅(配置中心)(适用于)

2.负载均衡

3.命名服务(Naming Service)(服务注册)

4.分布式通知/协调

5.集群管理与Master选举(hbase用到)

6.分布式锁

7.分布式队列

但是zookeeper也有很多缺点:

  1. zookeeper不是为高可用性设计的

  2. zookeeper的选举过程速度很慢。网络实际上常常是会出现隔离等不完整状态的,而zookeeper对那种情况非常敏感。一旦出现网络隔离,zookeeper就要发起选举流程。zookeeper的选举流程通常耗时30到120秒,期间zookeeper由于没有master,都是不可用的。对于网络里面偶尔出现的,比如半秒一秒的网络隔离,zookeeper会由于选举过程,而把不可用时间放大几十倍

  3. zookeeper的性能是有限的

  4. zookeeper的权限控制非常薄弱

zookeeper其实就是一个高可用的服务协调框架,不能作为存储,每次写入都必须集群n/2+1的集群写入完成之后才算完成,性能自己就可以想象了,而且其每个节点只能存储1M的数据,同时其节点目录也不宜过多。

有兴趣的童鞋,可以参考深入研究zookeeper,充分利用起优点,开发出更多的应用场景来。

        作者:王文雅     唯品会java后端开发工程师
上一篇下一篇

猜你喜欢

热点阅读