代码改变世界Golang开发指南IT技术分享

gRPC服务发现与负载均衡

2017-09-20  本文已影响1268人  Jay_Guo

1)简介

gRPC负载平衡的主要实现机制是外部负载平衡,即通过外部负载平衡器来向客户端提供更新后的服务器列表。

gRPC客户端也内置对少量几种负载平衡策略API的支持,其中包括grpclb策略(该策略实现了外部负载平衡),但并不鼓励用户在gRPC中添加更多的策略。新的负载平衡策略应该在外部负载平衡器中实现。

2)工作流

负载平衡策略在名称解析和服务器连接方面与gRPC客户端相适应,以下是其工作方式:

1.首先,gRPC客户端会发出对服务器名称解析的请求,该名称会被解析为一个或多个IP地址,每一个地址会标明它是服务器地址还是负载平衡器地址,并且服务配置中会注明客户端所选用的负载平衡策略(例如,round_robin或grpclb)。

2.客户端实例化负载平衡策略。

注:如果地址解析器返回的任何一个地址是平衡器地址,客户端将会使用grpclb策略,从而直接忽略在服务配置中注明的负载平衡策略。如果没有平衡器地址,客户端将使用在服务配置中注明的负载平衡策略。如果服务配置中没有标明负载策略,那么客户端会默认为选择第一个可用服务器地址的策略。

3.负载平衡策略会创建通向每一个服务器地址的子通道。

l对于除了grpclb以外的其他策略,这意味着每一个解析后的地址都会有一个子通道。注:这些策略会忽略任何解析后的平衡器地址。

l对于grpclb策略,工作流如下:

a)该策略创建一个通向平衡器地址的流。它会向平衡器请求客户端最初请求服务器名称的服务器地址。

注:目前grpclb策略会忽略所有的非平衡器解析地址。在未来,这一机制可能会改变,这些非平衡器解析地址会成为备选方案,以防没有关联到任何平衡器。

b)被负载平衡器定向到客户端的gRPC服务器集群会向负载平衡器通报负载情况。

c)负载平衡器会向gRPC客户端的grpclb策略返回服务器列表。然后grpclb策略会创建通向列表中服务器的子通道。

4.对于每一次RPC发送,负载平衡策略会决定RPC应该传向哪个子通道。

对于grpclb策略,客户端会以负载平衡器返回的服务器列表顺序向服务器发送请求,如果服务器列表为空,请求会阻塞直到回去非空服务器列表。

3)其他负载均衡方式简介

1、集中式LBProxy Model


在服务消费者和服务提供者之间有一个独立的LB,通常是专门的硬件设备如F5,或者基于软件如LVS,HAproxy等实现。LB上有所有服务的地址映射表,通常由运维配置注册,当服务消费方调用某个目标服务时,它向LB发起请求,由LB以某种策略,比如轮询(Round-Robin)做负载均衡后将请求转发到目标服务。LB一般具备健康检查能力,能自动摘除不健康的服务实例。该方案主要问题:

1.单点问题,所有服务调用流量都经过LB,当服务数量和调用量大的时候,LB容易成为瓶颈,且一旦LB发生故障影响整个系统;

2.服务消费方、提供方之间增加了一级,有一定性能开销。

2、进程内LBBalancing-awareClient)

针对第一个方案的不足,此方案将LB的功能集成到服务消费方进程里,也被称为软负载或者客户端负载方案。服务提供方启动时,首先将服务地址注册到服务注册表,同时定期报心跳到服务注册表以表明服务的存活状态,相当于健康检查,服务消费方要访问某个服务时,它通过内置的LB组件向服务注册表查询,同时缓存并定期刷新目标服务地址列表,然后以某种负载均衡策略选择一个目标服务地址,最后向目标服务发起请求。LB和服务发现能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。该方案主要问题:

1.开发成本,该方案将服务调用方集成到客户端的进程里头,如果有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本;

2.另外生产环境中,后续如果要对客户库进行升级,势必要求服务调用方修改代码并重新发布,升级较复杂。

4)gRPC结合ETCD负载均衡实现思路(仅供参考)

ETCD的介绍在文字ETCD实现技术总结中已经有了详细的介绍,gRPC结合ETCD实现外部负载均衡主要分以下四步:

1.服务注册

以服务名称作为Key,一个或多个服务器IP作为Value,并附带租期Put入etcd集群;

2.服务请求

gRPC客户端向etcd服务端发送服务名;

3.服务响应

etcd通过Get接口获取服务对应的服务器集群IP,通过服务器负载报告信息对服务器集群IP进行排序,将排序结果列表返回客户端;

4.gRPC服务请求与响应

gRPC客户端通过etcd服务端返回的IP列表连接gRPC服务端,实现gRPC服务。

5)基于进程内LB方案实现

根据gRPC官方提供的设计思路,基于进程内LB方案(即第2个案,阿里开源的服务框架 Dubbo 也是采用类似机制),结合分布式一致的组件(如Zookeeper、Consul、Etcd),可找到gRPC服务发现和负载均衡的可行解决方案。接下来以GO语言为例,简单介绍下基于Etcd3的关键代码实现:

1)命名解析实现:resolver.go

package etcdv3

import (

"errors"

"fmt"

"strings"

etcd3 "github.com/coreos/etcd/clientv3"

"google.golang.org/grpc/naming"

)

// resolver is the implementaion of grpc.naming.Resolver

type resolver struct {

serviceName string // service name to resolve

}

// NewResolver return resolver with service name

func NewResolver(serviceName string) *resolver {

return &resolver{serviceName: serviceName}

}

// Resolve to resolve the service from etcd, target is the dial address of etcd

// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"

func (re *resolver) Resolve(target string) (naming.Watcher, error) {

if re.serviceName == "" {

return nil, errors.New("grpclb: no service name provided")

}

// generate etcd client

client, err := etcd3.New(etcd3.Config{

Endpoints: strings.Split(target, ","),

})

if err != nil {

return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())

}

// Return watcher

return &watcher{re: re, client: *client}, nil

}

2)服务发现实现:watcher.go

package etcdv3

import (

"errors"

"fmt"

"strings"

etcd3 "github.com/coreos/etcd/clientv3"

"google.golang.org/grpc/naming"

)

// resolver is the implementaion of grpc.naming.Resolver

type resolver struct {

serviceName string // service name to resolve

}

// NewResolver return resolver with service name

func NewResolver(serviceName string) *resolver {

return &resolver{serviceName: serviceName}

}

// Resolve to resolve the service from etcd, target is the dial address of etcd

// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"

func (re *resolver) Resolve(target string) (naming.Watcher, error) {

if re.serviceName == "" {

return nil, errors.New("grpclb: no service name provided")

}

// generate etcd client

client, err := etcd3.New(etcd3.Config{

Endpoints: strings.Split(target, ","),

})

if err != nil {

return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())

}

// Return watcher

return &watcher{re: re, client: *client}, nil

}

3)服务注册实现:register.go

package etcdv3

import (

"fmt"

"log"

"strings"

"time"

etcd3 "github.com/coreos/etcd/clientv3"

"golang.org/x/net/context"

"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"

)

// Prefix should start and end with no slash

var Prefix = "etcd3_naming"

var client etcd3.Client

var serviceKey string

var stopSignal = make(chan bool, 1)

// Register

func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {

serviceValue := fmt.Sprintf("%s:%d", host, port)

serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)

// get endpoints for register dial address

var err error

client, err := etcd3.New(etcd3.Config{

Endpoints: strings.Split(target, ","),

})

if err != nil {

return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)

}

go func() {

// invoke self-register with ticker

ticker := time.NewTicker(interval)

for {

// minimum lease TTL is ttl-second

resp, _ := client.Grant(context.TODO(), int64(ttl))

// should get first, if not exist, set it

_, err := client.Get(context.Background(), serviceKey)

if err != nil {

if err == rpctypes.ErrKeyNotFound {

if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {

log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())

}

} else {

log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())

}

} else {

// refresh set to true for not notifying the watcher

if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {

log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())

}

}

select {

case <-stopSignal:

return

case <-ticker.C:

}

}

}()

return nil

}

// UnRegister delete registered service from etcd

func UnRegister() error {

stopSignal <- true

stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock

var err error;

if _, err := client.Delete(context.Background(), serviceKey); err != nil {

log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())

} else {

log.Printf("grpclb: deregister '%s' ok.", serviceKey)

}

return err

}

4)接口描述文件:helloworld.proto

syntax = "proto3";

option java_multiple_files = true;

option java_package = "com.midea.jr.test.grpc";

option java_outer_classname = "HelloWorldProto";

option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.

service Greeter {

//  Sends a greeting

rpc SayHello (HelloRequest) returns (HelloReply) {

}

}

// The request message containing the user's name.

message HelloRequest {

string name = 1;

}

// The response message containing the greetings

message HelloReply {

string message = 1;

}

5)实现服务端接口:helloworldserver.go

package main

import (

"flag"

"fmt"

"log"

"net"

"os"

"os/signal"

"syscall"

"time"

"golang.org/x/net/context"

"google.golang.org/grpc"

grpclb "com.midea/jr/grpclb/naming/etcd/v3"

"com.midea/jr/grpclb/example/pb"

)

var (

serv = flag.String("service", "hello_service", "service name")

port = flag.Int("port", 50001, "listening port")

reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")

)

func main() {

flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))

if err != nil {

panic(err)

}

err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)

if err != nil {

panic(err)

}

ch := make(chan os.Signal, 1)

signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)

go func() {

s := <-ch

log.Printf("receive signal '%v'", s)

grpclb.UnRegister()

os.Exit(1)

}()

log.Printf("starting hello service at %d", *port)

s := grpc.NewServer()

pb.RegisterGreeterServer(s, &server{})

s.Serve(lis)

}

// server is used to implement helloworld.GreeterServer.

type server struct{}

// SayHello implements helloworld.GreeterServer

func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {

fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)

return &pb.HelloReply{Message: "Hello " + in.Name}, nil

}

6)实现客户端接口:helloworldclient.go

package main

import (

"flag"

"fmt"

"time"

grpclb "com.midea/jr/grpclb/naming/etcd/v3"

"com.midea/jr/grpclb/example/pb"

"golang.org/x/net/context"

"google.golang.org/grpc"

"strconv"

)

var (

serv = flag.String("service", "hello_service", "service name")

reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")

)

func main() {

flag.Parse()

r := grpclb.NewResolver(*serv)

b := grpc.RoundRobin(r)

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)

conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))

if err != nil {

panic(err)

}

ticker := time.NewTicker(1 * time.Second)

for t := range ticker.C {

client := pb.NewGreeterClient(conn)

resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})

if err == nil {

fmt.Printf("%v: Reply is %s\n", t, resp.Message)

}

}

}

7)运行测试

1.运行3个服务端S1、S2、S3,1个客户端C,观察各服务端接收的请求数是否相等?

2.关闭1个服务端S1,观察请求是否会转移到另外2个服务端?

3.重新启动S1服务端,观察另外2个服务端请求是否会平均分配到S1?

4.关闭Etcd3服务器,观察客户端与服务端通信是否正常?

关闭通信仍然正常,但新服务端不会注册进来,服务端掉线了也无法摘除掉。

5.重新启动Etcd3服务器,服务端上下线可自动恢复正常。

6.关闭所有服务端,客户端请求将被阻塞。

参考:

http://www.grpc.io/docs/

https://github.com/grpc/grpc/blob/master/doc/load-balancing.md

上一篇下一篇

猜你喜欢

热点阅读