微服务架构和实践

consul

2017-11-09  本文已影响0人  auguszou

consulgolang开发的一个高可用的分布式服务注册系统,有service discoverykey/value,健康检查, 节点选举,多数据中心等功能,与zookeeperetcd等相似。微服务框架go-micro默认使用consul作服务发现。

安装

go get github.com/hashicorp/consul

运行

安装好consul后, 我们需要运行agentagent可以以server mode或者client mode方式运行,通常一个数据中心需要3个或者5个server mode, 其他的运行client modeclient agent是一个轻量级的进程,完成健康检查和转发请求到相应的server agent, 可参考consul cluster

consul agent -dev
# or
consul agent -dev -advertise=127.0.0.1

服务运行后, 我们可以在浏览器中打开consul的控制界面了,访问http://localhost:8500,界面如下图:

consul web ui

服务发现

consul 提供服务注册和服务查询的功能。服务指定的我们提供某个需求实现的接口,在consul中,我们的服务看起来如下所示:

{
  "service": {
    "name": "redis",
    "tags": ["primary"],
    "address": "",
    "port": 8000,
    "enable_tag_override": false,
    "checks": [
      {
        "script": "/usr/local/bin/check_redis.py",
        "interval": "10s"
      }
    ]
  }
}

上面就算是定义了一个服务name=redis, 我们给出了该服务的tags, 以及服务的addressportchecks指定我们对该服务需要做定时健康检查。

consul服务注册有两种方式:服务定义和HTTP API调用。通常我们使用服务定义,下面我们说明如何来注册服务。

{
  "service": {
    "name": "web",
    "tags": ["rails"],
    "address": "127.0.0.1",
    "port": 8000
  }
}

启动consul agent

consul agent -dev -advertise=127.0.0.1 -config-dir=/etc/consul.d

consul控制界面,可以发现已经成功注册了我们的web服务,如下图所示:

consul web ui
package main

import (
    "fmt"
    consulapi "github.com/hashicorp/consul/api"
    "log"
    "time"
)

// start consul
// consul agent -dev -enable-script-checks --advertise=127.0.0.1

// to listen localhost:8080
// nc -lp 8000

// consul web
// http://127.0.0.1:8500/ui

const Id = "1234567890"

func testRegister() {

    fmt.Println("test begin .")
    config := consulapi.DefaultConfig()
    //config.Address = "localhost"
    fmt.Println("defautl config : ", config)
    client, err := consulapi.NewClient(config)
    if err != nil {
        log.Fatal("consul client error : ", err)
    }
    //创建一个新服务。
    registration := new(consulapi.AgentServiceRegistration)
    registration.ID = Id
    registration.Name = "web"
    registration.Port = 8000
    registration.Tags = []string{"rails"}
    registration.Address = "127.0.0.1"

    //  //增加check。
    //  check := new(consulapi.AgentServiceCheck)
    //  check.HTTP = fmt.Sprintf("http://%s:%d%s", registration.Address, registration.Port, "/check")
    //  //设置超时 5s。
    //  check.Timeout = "5s"
    //  //设置间隔 5s。
    //  check.Interval = "5s"
    //  //注册check服务。
    //  registration.Check = check
    //  log.Println("get check.HTTP:", check)
    //
    //  err = client.Agent().ServiceRegister(registration)
    //
    //  if err != nil {
    //      log.Fatal("register server error : ", err)
    //  }

    //增加check。
    check := new(consulapi.AgentServiceCheck)

    check.Args = []string{"sh", "-c", "sleep 1 && exit 0"}
    //设置超时 5s。
    check.Timeout = "5s"
    //设置间隔 5s。
    check.Interval = "5s"
    //注册check服务。
    registration.Check = check

    err = client.Agent().ServiceRegister(registration)

    if err != nil {
        log.Fatal("register server error : ", err)
    }
}

func testDeregister() {
    fmt.Println("test begin .")
    config := consulapi.DefaultConfig()
    //config.Address = "localhost"
    fmt.Println("defautl config : ", config)
    client, err := consulapi.NewClient(config)
    if err != nil {
        log.Fatal("consul client error : ", err)
    }

    err = client.Agent().ServiceDeregister(Id)
    if err != nil {
        log.Fatal("register server error : ", err)
    }
}

func main() {
    log.Println("ready to register service")
    testRegister()

    time.Sleep(1 * time.Minute)

    log.Println("ready to deregister service")
    testDeregister()
}

服务查询, 服务消费者为了或者某个名字的服务提供者的具体信息(服务地址,端口等等),需要从consul agent进行查询。

key/value

consul提供用于存储分布式环境需要的配置以及服务信息的key/value数据库。当然我们也可以根据需要存储我们自己的数据,示例演示了基本的key/value操作,代码如下所示:

package main

import (
    "github.com/hashicorp/consul/api"
    "log"
)

func main() {
    config := api.DefaultConfig()
    client, err := api.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    kv := client.KV()

    pair := &api.KVPair{Key: "key-1", Value: []byte("10")}

    if _, err := kv.Put(pair, nil); err != nil {
        log.Fatal(err)
    }

    if p, _, err := kv.Get(pair.Key, nil); err != nil {
        log.Fatal(err)
    } else {
        log.Println(p.Key, "=>", string(p.Value))
    }

    if ps, _, err := kv.List(pair.Key, nil); err != nil {
        log.Fatal(err)
    } else {
        for _, p := range ps {
            log.Println(p.Key, "=>", string(p.Value))
        }
    }

    if ks, _, err := kv.Keys(pair.Key, "/", nil); err != nil {
        log.Fatal(err)
    } else {
        for _, k := range ks {
            log.Println(k)
        }
    }

    if _, err := kv.Delete(pair.Key, nil); err != nil {
        log.Fatal(err)
    } else {
        log.Println("delete key '", pair.Key, "' successfully")
    }
}

健康检查

consul在每个server上都运行了一个轻量级的agent,因此健康检查(health check)可以从两个层次来进行,一个是针对service的应用级别检查,另一个是针对系统级别的检查。跟service一样,我们同样可以采用两种方式来定义检查: 检查定义和HTTP APIconsul提供五种方式的健康检查:

这种方式即使采用脚本定时(默认30s)的去检查,脚本的exit code和输出内容(最大长度4K,多余的将被截取)作为检查的结果。consul agent启动时,需设置enable_script_checks选项true

定时(默认30s)发送HTTP请求,根据请求返回的状态码来确定服务的检查结果,2xx表示服务正常, 429表示请求过多(警告状态, warning), 其他状态表示服务不正常(failure)。通常采用curl或者某种HTTP工具来实现。

定时(默认30s)连接指定的hostname/ip(默认locahost)和port,如果连接成功,表示服务正常,否则不正常。

由服务定时(指定的TTL时长)去维护某个检查状态,如果服务无法正常去更新检查状态,则该状态会被标记为失败(failure)。

我们的服务采用docker方式来部署并使用Docker Exec API来运行时,需设置enable_script_checks选项true

如何定义健康检查可参考前文所述服务注册#HTTP API

分布式锁的实现

参考文章
分布式锁的实现方式目前有redisSETNXetcd。下面使用consul来实现。
consul官方关于信号量的说明

package main

import (
    "fmt"
    consulapi "github.com/hashicorp/consul/api"
    "log"
    "sync"
    "time"
)

var n int = 0

func print(wg *sync.WaitGroup, char string) {
    go func() {
        config := consulapi.DefaultConfig()
        client, err := consulapi.NewClient(config)

        lock, err := client.LockKey("lockthis")
        if err != nil {
            log.Fatal(err)
        }

        defer wg.Done()
        for {
            _, err := lock.Lock(nil)
            if err != nil {
                log.Fatal(err)
            }
            if n >= 20 {
                if err := lock.Unlock(); err != nil {
                    log.Fatal(err)
                }
                break
            }
            n++
            fmt.Println(char, "->", n)
            time.Sleep(1 * time.Second)
            if err := lock.Unlock(); err != nil {
                log.Fatal(err)
            }
        }
    }()
}

func main() {
    wg := &sync.WaitGroup{}
    wg.Add(2)

    print(wg, "A")
    print(wg, "B")

    wg.Wait()
    fmt.Println("Done")
}

执行该代码会依次打印出字母AB

上一篇下一篇

猜你喜欢

热点阅读