2018-05-15 建立一个基本的分布式系统

2018-05-16  本文已影响0人  fairy冉冉

下面,我们用golang来实现一个基本的分布式系统,它有如下功能:
1、能够发送/接收请求和响应
2、能够连接到集群
3、如果无法连接到集群,(如果它是第一个节点),则可以作为主节点启动
4、每个节点有唯一的标识
5、能够在节点之间交互json数据包
6、接受命令行参数中的所有信息(将来在我们系统升级时有很大作用)

闲话少表,开始。
打开GoLand,新建一个项目BaseDApp,记住这个项目名字,后边会用到。
新建main.go文件,编写代码:

package main

import (
    "fmt"
    "strconv"
    "time"
    "math/rand"
    "net"
    "flag" //系统标准库,flag包实现了命令行参数的解析,具体用法,自行google
    "strings"
    "encoding/json"
)

type NodeInfo struct {
    // 节点ID,通过随机数生成
    NodeId int `json:"nodeId"`
    
    //节点IP地址
    NodeIpAddr string `json:"nodeIpAddr"`

    //节点端口
    Port string `json:"port"`
}

// 将节点数据信息格式化输出
// NodeInfo:{nodeId: 89423,nodeIpAddr: 127.0.0.1/8,port: 8001}
func (node *NodeInfo) String() string{
    return "NodeInfo:{ nodeId:" + strconv.Itoa(node.NodeId) + ", nodeIpAddr:" + node.NodeIpAddr + ", port:" + node.Port + " }"
}

// 添加一个节点到集群的一个请求或者响应的标准格式
type AddToClusterMessage struct {
    //源节点
    Source NodeInfo `json:"source"`

    //目的节点
    Dest NodeInfo `json:"dest"`

    //两个节点连接时发送的消息
    Message string `json:"message"`
}

// Request/Response 信息格式化输出
func (req AddToClusterMessage) String() string {
    return "AddToClusterMessage:{\n source:" + req.Source.String() + ",\n dest: " + req.Dest.String() + ",\n message:" + req.Message + " }"
}

func main() {

    // 后边,我们将使用命令行,来动态地设置节点的ip地址、端口等信息,所以需要用到解析命令行参数的 flag 包

    // 将命令行中的参数 makeMasterOnError 所 对应的bool值,赋给 makeMasterOnError 变量,如果命令行中不设置该参数,则默认值为false
    makeMasterOnError := flag.Bool("makeMasterOnError", false, "如果IP地址没有连接到集群中,我们将其作为Master节点。")

    // 将命令行中的参数 clusterip 所 对应的string值,赋给 clusterip 变量,如果命令行中不设置该参数,则默认值为127.0.0.1:8001
    clusterip := flag.String("clusterip", "127.0.0.1:8001", "任何的节点连接都连接这个IP")

    // 将命令行中的参数 myport 所 对应的string值,赋给 myport 变量,如果命令行中不设置该参数,则默认值为8001
    myport := flag.String("myport", "8001", "ip address to run this node on. default is 8001")

    // 开始解析命令行参数
    flag.Parse()

    // 打印命令行参数
    fmt.Println("当前命令行参数 makeMasterOnError : ", *makeMasterOnError)
    fmt.Println("当前命令行参数 clusterip : ", *clusterip)
    fmt.Println("当前命令行参数 myport : ", *myport)

    // 为节点生成ID
    rand.Seed(time.Now().UTC().UnixNano())     //生成随机数之前,要先设置种子
    myid := rand.Intn(99999999)            //随机数
    fmt.Println("随机生成 myid : ", myid)

    // 获取本机 ip 地址
    myip, _ := net.InterfaceAddrs()
    fmt.Println("获取本机 myip : ", myip[0])

    // 创建 NodeInfo 结构体对象 me,即本机节点
    me := NodeInfo{myid,myip[0].String(),*myport}
    fmt.Println("创建 me 节点: ",me.String())

    //根据命令行中设置的集群ip地址参数 cluserip,来创建一个目标节点
    dest := NodeInfo{-1,strings.Split(*clusterip,":")[0],strings.Split(*clusterip,":")[1]}
    fmt.Println("创建 dest 节点: ",dest.String())

    fmt.Println("尝试将 me 连接到集群 dest")
    // 尝试将 me 连接到集群 dest,在已连接的情况下并且向集群发送请求
    ableToConnect := connectToCluster(me, dest)

    // 监听其他节点将要加入到集群的请求
    // ableToConnect 连接成功
    // (!ableToConnect && *makeMasterOnError) 如果me是集群中第一个启动的节点,me想去连接dest,是肯定会失败的,所以将me作为master节点。
    if ableToConnect || (!ableToConnect && *makeMasterOnError) {
        if *makeMasterOnError {
            fmt.Println("me 符合成为master节点的条件,集群正式建立")
        }

        //果然 me 成功地连接到集群dest,则监听me上的端口,看是否有消息
        listenOnPort(me)
    }else {
        fmt.Printf("me 连接 dest 失败,请在命令行中设置 makeMasterOnError 参数 为 true ,以便将 me 节点 id = %v 设为 master 节点.\n",myid)

    }

}


//  这是发送请求时格式化json包用的工具,这是非常重要的,如果不经过数据格式化,你最终发送的将是空白消息
//  利用节点的具体信息,生成一个 AddToClusterMessage 结构体对象
func getAddToClusterMessage(source NodeInfo, dest NodeInfo, message string) (AddToClusterMessage) {
    return AddToClusterMessage{
        Source : NodeInfo{
            NodeId : source.NodeId,
            NodeIpAddr : source.NodeIpAddr,
            Port : source.Port,
        },
        Dest : NodeInfo{
            NodeId : dest.NodeId,
            NodeIpAddr : dest.NodeIpAddr,
            Port : dest.Port,
        },
        Message : message,
    }
}


/*
    encoder与decoder像是在writer外面封装了一层。会根据指定的数据结构的格式进行读写。如果文件中的json格式与指定的数据结构的格式不一致会出现error。
    在decoder的过程中,用一个for{}不停的读文件,直到出现error,代表文件结束。在for{}中,每次都要申请一个新的空间,存放从文件中读取出来的数据。
 */
//  尝试将 me 连接到集群 dest
func connectToCluster(me NodeInfo, dest NodeInfo) (bool)  {

    // 建立 TCP Socket 连接,此处使用超时机制的DialTimeout 方法
    connOut,err := net.DialTimeout("tcp", dest.NodeIpAddr + ":" + dest.Port, time.Duration(10) * time.Second)

    if err != nil {
        if _, ok := err.(net.Error); ok {
            fmt.Printf("节点 me = %v 未连接到集群. %v\n", me.NodeId,err)
        }
    }else {
        fmt.Printf("节点 me = %v 成功连接到集群,发送消息到节点.\n", me.NodeId)
        text := "Hi Fairy.. 请添加我到集群中..."
        requestMessage := getAddToClusterMessage(me,dest,text)
        json.NewEncoder(connOut).Encode(&requestMessage) //将 结构体AddToClusterMessage对象 转化成 json 字符串,写入connOut中

        //下面这四行,跟节点之间的信息交流无关,可省略,这里只是想把json再次转化为结构体AddToClusterMessage对象,从而格式化输出
        decoder := json.NewDecoder(connOut)
        var responseMessage AddToClusterMessage
        decoder.Decode(&responseMessage)
        fmt.Println("得到数据响应:\n" + responseMessage.String())

        return true
    }

    return false
}


/*
    encoder与decoder像是在writer外面封装了一层。会根据指定的数据结构的格式进行读写。如果文件中的json格式与指定的数据结构的格式不一致会出现error。
    在decoder的过程中,用一个for{}不停的读文件,直到出现error,代表文件结束。在for{}中,每次都要申请一个新的空间,存放从文件中读取出来的数据。
 */
func listenOnPort(me NodeInfo)  {
    // 监听即将到来的消息
    ln, _ := net.Listen("tcp", fmt.Sprint(":" + me.Port))

    // 接受连接
    for {
        connIn,err := ln.Accept()
        if err != nil {
             if _,ok := err.(net.Error); ok {
                fmt.Println("Error received while listening", me.NodeId)
             }
        }else {
            // 将接收到的 json 数据,转化为 AddToClusterMessage 结构体对象
            //下面这三行,跟节点之间的信息交流无关,可省略,这里只是想把json再次转化为结构体AddToClusterMessage对象,从而格式化输出
            var requestMessage AddToClusterMessage
            json.NewDecoder(connIn).Decode(&requestMessage)
            fmt.Println("Got request:\n" + requestMessage.String())

            // 回复
            text := "OK God.. 我马上把你加入集群中.."
            responseMessage := getAddToClusterMessage(me,requestMessage.Source, text)
            json.NewEncoder(connIn).Encode(&responseMessage)
            connIn.Close()
        }
    }

}

代码写好后,该运行看看效果了,这里我们使用命令行工具。
1、如下图,点击Terminal,启动命令行。


1.png

2、启动成功。


2.png

3、执行命令,安装BaseDApp(输入 go install BaseDApp) ,并运行BaseDApp(输入 BaseDApp)


3.png

4、重新运行BaseDApp,并设置makeMasterOnError参数为true。


4.png

5、从命令行界面得知,此时程序并没有结束,光标一闪一闪的,me节点正在监听。

6、打开Mac自带的终端,因为我们之前已经在GoLand自带的Terminal中,执行go install BaseDApp命令,安装了BaseDApp,所以此时在系统终端中,可以直接运行BaseDApp项目,输入BaseDApp -clusterip 127.0.0.1:8001 -myport 8002 ,解释一下,clusterip指的是,目的节点的ip地址,即代码中的dest,myport指的是,me节点用来跟其他节点通信的端口,makeMasterOnError没写,使用默认值false(代码中规定了)。


5.png

7、由图5可知,系统终端代表的节点,成功的连接到了GoLand自带的Terminal代表的节点。

8、再次打开一个Mac自带的终端,输入BaseDApp -clusterip 127.0.0.1:8002 -myport 8003,让第二个系统终端代表的节点,去连接第一个系统终端代表的节点。


6.png

9、总之,你可以通过修改命令行参数,连接不同的节点。

上一篇下一篇

猜你喜欢

热点阅读