2018-05-14 zookeeper实现分布式系统

2018-05-17  本文已影响0人  fairy冉冉

关于zookeeper的介绍,请大家自行google。本文仅介绍,如何使用go语言版本的zk包来实现一个简单的分布式server。
本文主要分为两部分,第一部分,安装zookeeper服务;第二部分,使用zk包实现分布式server。

第一部分

首先,要在电脑上安装zookeeper,并启动它,本文以Mac电脑为例,使用brew来安装zookeeper。

1、打开终端,输入命令 :brew info zookeeper,查看zookeeper的相关信息。 1.png 2、输入命令:brew install zookeeper,自动安装。 2.png

3、安装成功后,输入命令:cd /usr/local/etc/zookeeper,前往zookeeper文件夹,这个路径,是使用brew 安装 zookeeper
所自动生成的。在这个文件夹里,有如下文件:

3.png
其中,zoo.cfg 文件,就是缺省配置文件。使用cat 命令查看该文件:
4.png
其中,dataDir顾名思义就是Zookeeper保存数据的目录,默认情况下Zookeeper将写数据的日志文件也保存在这个目录里。但我们一般会设置一个dataLogDir目录,用来保存日志,这将极大的提升ZK性能。
而clientPort这个端口就是客户端(应用程序)连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求。
更多的参数,请参考:http://www.cnblogs.com/xiohao/p/5541093.html

4、在我们这个简单的例子中,配置文件不需要额外的修改,保持原状即可。

5、此时,zookeeper已经安装成功,输入:zkServer,可查看zookeeper提供的操作命令: 5.png
6、输入:zkServer status 查看当前zookeeper的运行状态,发现zookeeper服务尚未启动: 6.png
7、输入:zkServer start 启动服务,再次输入 zkServer status 查看状态: 8.png

8、至此,zookeeper服务成功启动。此外,zookeeper还提供了一个客户端命令行工具zkCli ,供开发人员使用。
9、输入:zkCli ,连接zookeeper server:

9.png
10、zkCli 连接成功后,输入:h,查看 zkCli 所提供的各类操作: 10.png
11、输入:ls / 查看根目录下的节点: 11.png
12、zookeeper中,节点分为四种类型:持久节点(PERSISTENT)、持久顺序节点(PERSISTENT_SEQUENTIAL)、临时节点(EPHEMERAL)、临时顺序节点(EPHEMERAL_SEQUENTIAL): 12.png 由图10可知,新建节点的命令格式为:create [-s] [-e] path data acl。解释一下,-s代表顺序,-e代表临时,这两个参数是可选的,不写的话,默认为持久节点。path是节点路径,路径必须以根目录/开头。data是节点所关联的数据。acl是节点的操作权限。
现在我们新建一个持久节点fairy。输入:create /fairy,发现没有创建成功,尝试输入:create /fairy 123,关联了数据,创建成功。 13.png
接着我们创建一个持久顺序节点ranran,发现系统自动地在ranran后面加上了一长串数字: 14.png
注意!!!后面用代码实现分布式server时,添加节点,使用了ip地址加port端口,作为节点的名字。这样命名的节点,不能是顺序节点,否则就会出现类似127.0.0.1:88990000000006这样的节点,客户端连接这样的节点的时候,就会提示端口号无效。

第二部分

直接上代码,分为3个文件,zkutil、server、client。

zkutil,工具类,封装了第三方zk包,提供接口供 server 和 client 使用
package example

import (
    "fmt"
    zookeeper "github.com/samuel/go-zookeeper/zk"  //为了方便,给第三方包定义了一个别名 zookeeper
    "time"
)


//创建zkutil文件,通过调用第三方zk包的某些方法,从而为我们自己本身的项目MyZookeeper提供一些基本的方法
//可以理解为zkutil 把 第三方 zk 给封装了一层

//const 定义常量
const (
    timeOut = 20  //20s还未响应,则认为请求超时
)

//服务器ip列表
var hosts []string = []string{"127.0.0.1:2181"} // the zk server list


//连接服务器
func GetConnect() (conn *zookeeper.Conn, err error) {

    //调用第三方zk包的Connect方法,判断连接服务器是否成功,成功的话,返回一个连接Conn
    conn, _, err = zookeeper.Connect(hosts, timeOut*time.Second)
    if err != nil {
        fmt.Println("服务器连接失败",err)
    }
    return
}

//注册一个节点
func RegistServer(conn *zookeeper.Conn, host string) (err error) {

    //调用第三方zk包的Create方法,往zookeeper的 fairy 节点下(咱们在第一部分的时候所创建的节点),新增一个服务器节点,本例中,用的host
    //作为节点名字,
    _, err = conn.Create("/fairy/"+host, nil, 0, zookeeper.WorldACL(zookeeper.PermAll))
    return
}

//获取fairy节点下,所包含的服务器节点列表
func GetServerList(conn *zookeeper.Conn) (list []string, err error) {
    list, _, err = conn.Children("/fairy")
    return
}

server,服务器端
package main

import (
    "fmt"
    "MyZookeeper/example"
    "net"
    "os"
    "time"
)

func main() {

    //在本机上,开辟三个不同端口,模拟三台服务器设备,然后将这三台服务器加入到zookeeper中,作为服务器节点
    go starServer("127.0.0.1:4444")
    go starServer("127.0.0.1:5555")
    go starServer("127.0.0.1:6666")

    a := make(chan bool, 1)
    <-a
}

//启动服务器
func starServer(port string) {

    // 创建一个TCP服务端
    tcpAddr, err := net.ResolveTCPAddr("tcp4", port)
    checkError(err,"新建一个tcp连接")
    fmt.Println("TCP : ",tcpAddr)

    // 监听端口
    listener, err := net.ListenTCP("tcp", tcpAddr)
    checkError(err,"监听端口")

    // 连接 zookeeper 服务,只有连接成功了,才能注册
    conn, err := example.GetConnect()
    if err != nil {
        fmt.Printf("连接 zookeeper 失败: %s\n",err)
    }else{
        fmt.Println("连接 zookeeper 成功")
    }


    defer conn.Close()

    err = example.RegistServer(conn, port)
    if err != nil {
        fmt.Printf("%s 注册节点失败: %s \n",port,err)
    }else {
        fmt.Printf("%s 注册节点成功 \n",port)
    }

    // 死循环的处理客户端请求
    for {

        // 等待客户的连接,注意这里是无法并发处理多个请求的
        conn, err := listener.Accept()

        // 如果有错误直接跳过
        if err != nil {
            fmt.Fprintf(os.Stderr, "本次监听,失败: %s", err)
            continue
        }


        // 开启一个协程,向客户端发送数据,并关闭连接
        go handleCient(conn, port)
    }

}

func handleCient(conn net.Conn, port string) {
    defer conn.Close()

    // 读取接收到的 client 信息
    bs := make([] byte,512)
    m,_ := conn.Read(bs)
    fmt.Println("收到 client 的 :",string(bs[:m]))

    daytime := time.Now().String()
    conn.Write([]byte("我是节点 " + port + ",此时为:" + daytime))
}

func checkError(err error, desc string) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "%s 失败: %s \n",desc, err)
        os.Exit(1)
    }
    fmt.Println(desc,"成功")
}

client,客户端
package main

import (
    "errors"
    "fmt"
    "MyZookeeper/example"
    "io/ioutil"
    "math/rand"
    "net"
    "os"
    "time"
)

func main() {
    for i := 0; i < 10; i++ {
        fmt.Println("i :",i)
        startClient()
        time.Sleep(1 * time.Second)
    }
}

func startClient() {

    // 客户端,先去获取一个服务器地址,然后才能与之进行tcp通讯
    serverHost, err := getServerHost()
    if err != nil {
        fmt.Printf("获取某个服务器ip地址,失败: %s \n", err)
        return
    }

    fmt.Println("随机选择出来的服务器为 : " + serverHost)

    //模拟 client 请求 serverHost 服务器
    //ResolveTCPAddr用于获取一个TCPAddr
    tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost)
    checkError(err,"获取一个TCPAddr")

    // 建立一个TCP连接,连接到刚选择出来的服务器
    conn, err := net.DialTCP("tcp", nil, tcpAddr)
    checkError(err,"client建立一个TCP连接,连接到服务器")

    defer conn.Close()

    // 向tcpconn中写入数据
    _, err = conn.Write([]byte("hello,I'm a client ..."))
    checkError(err,"向tcpconn中写入数据")

    // 读取tcpconn中的数据
    result, err := ioutil.ReadAll(conn)
    checkError(err,"读取tcpconn中的数据")
    fmt.Println("tcpconn中的数据 : ",string(result))

    return
}

func getServerHost() (host string, err error) {

    // 连接 zookeeper 服务,只有连接成功了,才能获取服务器列表
    conn, err := example.GetConnect()
    if err != nil {
        fmt.Printf("连接 zookeeper 失败: %s\n",err)
        return
    }else{
        fmt.Println("连接 zookeeper 成功")
    }


    defer conn.Close()

    //获取fairy节点下,所包含的服务器节点列表
    serverList, err := example.GetServerList(conn)
    if err != nil {
        fmt.Printf("获取fairy节点下,所包含的服务器节点列表,失败: %s \n", err)
        return
    }else{
        fmt.Println("获取fairy节点下,所包含的服务器节点列表,成功")
    }

    count := len(serverList)
    if count == 0 {
        err = errors.New("fairy节点下,所包含的服务器节点列表是空的 \n")
        return
    }

    //随机选中一个服务器返回
    fmt.Println("从获取到的服务器节点列表中,随机选择一个,去响应客户端")
    r := rand.New(rand.NewSource(time.Now().UnixNano()))
    host = serverList[r.Intn(3)]
    return
}

func checkError(err error, desc string) {
    if err != nil {
        fmt.Fprintf(os.Stderr, "%s 失败: %s \n",desc, err)
        os.Exit(1)
    }
    fmt.Println(desc,"成功")
}

代码写好后,我们先运行server端代码,注册节点;接着再运行client端代码,查看连接情况:

1、运行server端: 14.png 2、运行client端: 15.png 3、回到server终端界面查看: 16.png
上一篇下一篇

猜你喜欢

热点阅读