ZK springboot整合zookeeper

2021-03-30  本文已影响0人  小P聊技术

1 资源

资源信息 版本号 备注
zookeeper 3.4.10 IP: 192.168.51.4
springboot 2.1.5.RELEASE
prettyZoo 2.0 zookeeper可视化工具

zookeeper可视化工具 下载

springboot-zookeeper-demo 源码 下载

2 zookeeper安装

需要安装zookeeper,如果未安装,可参考博文:

ZK zookeeper单机安装与配置

3 springboot整合

3.1 pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
        <!--        <version>2.3.2.RELEASE</version>-->
        <relativePath />
    </parent>

    <groupId>com.auksat.demo</groupId>
    <artifactId>springboot-zookeeper-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <zookeeper.version>3.4.10</zookeeper.version>
        <curator.version>2.11.1</curator.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>${zookeeper.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.apache.curator</groupId>-->
<!--            <artifactId>curator-framework</artifactId>-->
<!--            <version>${curator.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.curator</groupId>-->
<!--            <artifactId>curator-recipes</artifactId>-->
<!--            <version>${curator.version}</version>-->
<!--        </dependency>-->
<!--        <dependency>-->
<!--            <groupId>org.apache.curator</groupId>-->
<!--            <artifactId>curator-client</artifactId>-->
<!--            <version>${curator.version}</version>-->
<!--        </dependency>-->
    </dependencies>

</project>

3.2 配置信息

3.2.1 application.yml

zookeeper:
  address: 192.168.51.4:2181
  timeout: 4000

3.2.2 配置类

package com.auskat.demo.zookeeper.config;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * 类文件: ZookeeperConfig
 * <p>
 * <p>
 * 类描述:
 * <p>
 * 作     者: AusKa_T
 * <p>
 * 日     期: 2021/3/24 0024
 * <p>
 * 时     间: 9:23
 * <p>
 */
@Configuration
public class ZookeeperConfig {

    private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class);

    @Value("${zookeeper.address}")
    private String connectString;

    @Value("${zookeeper.timeout}")
    private int sessionTimeout;

    public String getConnectString() {
        return connectString;
    }

    public void setConnectString(String connectString) {
        this.connectString = connectString;
    }

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Bean(name = "zkClient")
    public ZooKeeper zkClient() {
        ZooKeeper zooKeeper = null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 如果收到了服务端的响应事件,说明连接成功
                    if (Event.KeeperState.SyncConnected == event.getState()) {
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            logger.info("  初始化ZooKeeper连接状态: {}",zooKeeper.getState());
        } catch (Exception e) {
            logger.error(" 初始化Zookeeper连接状态异常: {}",e.getMessage());
        }
        return  zooKeeper;
    }


}

3.3 自定义监听

package com.auskat.demo.zookeeper.watch;

import com.auskat.demo.zookeeper.config.ZookeeperConfig;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 类文件: WatcherApi
 * <p>
 * <p>
 * 类描述:
 * <p>
 * 作     者: AusKa_T
 * <p>
 * 日     期: 2021/3/24 0024
 * <p>
 * 时     间: 9:40
 * <p>
 */
public class CustomWatcher implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(CustomWatcher.class);

    @Override
    public void process(WatchedEvent event) {
        logger.info("监听事件的状态: {}",event.getState());
        logger.info("监听事件的路径: {}",event.getPath());
        logger.info("监听事件的类型: {}",event.getType());
    }

}

3.4 自定义工具类

package com.auskat.demo.zookeeper.utils;

import com.auskat.demo.zookeeper.config.ZookeeperConfig;
import com.auskat.demo.zookeeper.watch.CustomWatcher;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.List;

/**
 * 类文件: ZkUtil
 * <p>
 * <p>
 * 类描述:
 * <p>
 * 作     者: AusKa_T
 * <p>
 * 日     期: 2021/3/24 0024
 * <p>
 * 时     间: 9:35
 * <p>
 */
@Component
public class ZkUtil {

    private static final Logger logger = LoggerFactory.getLogger(ZkUtil.class);

    @Autowired
    private ZooKeeper zkClient;

    @Autowired
    private ZookeeperConfig ZooKeeper;



    /**
     * 创建持久化节点
     * -- 客户端断开连接后,节点数据持久化在磁盘上,不会被删除。
     *
     * @param path 路径
     * @param data 数据
     */
    public boolean createPerNode(String path, String data) {
        try {
            // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
            zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            return true;
        } catch (Exception e) {
            logger.error("创建持久化节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
            return false;
        }
    }

    /**
     * 创建临时节点
     * -- 客户端断开连接后,节点将被删除。
     *
     * @param path 路径
     * @param data 数据
     */
    public boolean createTmpNode(String path, String data) {
        try {
            // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
            zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            return true;
        } catch (Exception e) {
            logger.error("创建临时节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
            return false;
        }
    }

    /**
     * 创建自定义节点
     *
     * @param path       路径
     * @param data       数据
     * @param acl        节点权限
     * @param createMode 节点类型
     */
    public boolean createNode(String path, String data, List<ACL> acl, CreateMode createMode) {
        try {
            // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
            zkClient.create(path, data.getBytes(), acl, createMode);
            return true;
        } catch (Exception e) {
            logger.error("创建节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
            return false;
        }
    }


    /**
     * 修改节点
     *
     * @param path 路径
     * @param data 数据
     */
    public boolean updateNode(String path, String data) {
        try {
            // zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
            // version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
            zkClient.setData(path, data.getBytes(), -1);
            return true;
        } catch (Exception e) {
            logger.error("修改节点异常,路径: {}, 数据: {}, 异常: {}", path, data, e);
            return false;
        }
    }

    /**
     * 删除节点
     *
     * @param path 路径
     */
    public boolean deleteNode(String path) {
        try {
            // version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
            zkClient.delete(path, -1);
            return true;
        } catch (Exception e) {
            logger.error("删除节点异常,路径: {}, 异常: {}", path, e);
            return false;
        }
    }

    /**
     * 判断指定节点是否存在
     *
     * @param path      路径
     * @param needWatch 指定是否复用zookeeper中默认的Watcher
     * @return 结果
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return zkClient.exists(path, needWatch);
        } catch (Exception e) {
            logger.error("判断指定节点是否存在异常,路径: {}, 异常: {}", path, e);
            return null;
        }
    }

    /**
     * 检测结点是否存在 并设置监听事件
     * 三种监听类型: 创建,删除,更新
     *
     * @param path    路径
     * @param watcher 传入指定的监听类
     */
    public Stat exists(String path, Watcher watcher) {
        try {
            return zkClient.exists(path, watcher);
        } catch (Exception e) {
            logger.error("判断指定节点是否存在异常,路径: {}, 异常: {}", path, e);
            return null;
        }
    }


    /**
     * 获取当前节点的子节点(不包含孙子节点)
     *
     * @param path 父节点path
     */
    public List<String> getChildren(String path) throws KeeperException, InterruptedException {
        List<String> list = zkClient.getChildren(path, false);
        return list;
    }

    /**
     * 获取指定节点的值
     *
     * @param path 路径
     */
    public String getData(String path, Watcher watcher) {
        try {
            Stat stat = new Stat();
            byte[] bytes = zkClient.getData(path, watcher, stat);
            return new String(bytes);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

//    /**
//     * 注册监听
//     * @param watcher 监听类
//     */
//    public void registerWatch(Watcher watcher) throws IOException {
//        ZooKeeper zooKeeper = new ZooKeeper(ZooKeeper.getConnectString(), ZooKeeper.getSessionTimeout(), watcher);
//    }


}

4 功能测试

4.1 Idea调试

package com.auskat.demo.zookeeper;

import com.auskat.demo.zookeeper.utils.ZkUtil;
import com.auskat.demo.zookeeper.watch.CustomWatcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * 类文件: ApplicationTest
 * <p>
 * <p>
 * 类描述:
 * <p>
 * 作     者: AusKa_T
 * <p>
 * 日     期: 2021/3/24 0024
 * <p>
 * 时     间: 9:59
 * <p>
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTest {

    @Autowired
    private ZkUtil zkUtil;

    /**
     * 新增节点
     */
    @Test
    public void testCreateNode() {
        zkUtil.createPerNode("/demo", "auskat");
    }

    /**
     * 修改节点
     */
    @Test
    public void testUpdateNode() {
        zkUtil.updateNode("/demo", "auskat-2");
    }

    /**
     * 获取节点是否存在
     * 自定义监听
     */
    @Test
    public void exists() {
        zkUtil.exists("/demo", new CustomWatcher());
    }

    /**
     * 获取节点数据
     * 自定义监听
     */
    @Test
    public void getData() throws InterruptedException {
        String data = zkUtil.getData("/demo", new CustomWatcher());
        System.out.println(data);
        zkUtil.updateNode("/demo", "auskat-3");
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 删除节点
     */
    @Test
    public void testDeleteNode() {
        zkUtil.deleteNode("/demo");
    }


}

4.2 可视化工具查看

使用 prettyZoo zookeeper可视化工具查看节点信息

4.2.1 建立连接

在这里插入图片描述 在这里插入图片描述

4.2.2 查看数据

在这里插入图片描述

5 相关信息

上一篇下一篇

猜你喜欢

热点阅读