go语言grpc学习笔记

2021-03-18  本文已影响0人  不能吃的坚果j

本文作者:陈进坚
个人博客:https://jian1098.github.io
CSDN博客:https://blog.csdn.net/c_jian
简书:https://www.jianshu.com/u/8ba9ac5706b6
联系方式:jian1098@qq.com

grpc教程


视频:https://www.bilibili.com/video/BV1GE411A7kp

代码:https://github.com/zhuge20100104/grpc-demo

打开go-module


set GO111MODULE=on    //windows
export GO111MODULE=on //linux

编辑器settings-GO-Go Modules勾选

安装protoc


Linux

创建目录

mkdir proto
cd proto

打开https://github.com/protocolbuffers/protobuf/releases下载最新版本的protoc-*-linux-x86_64.zip

wget https://github.com/protocolbuffers/protobuf/releases/download/v3.12.3/protoc-3.12.3-linux-x86_64.zip

解压

unzip protoc-3.12.3-linux-x86_64.zip

添加到path

vim /etc/profile

把你的bin路径加到最后保存

export PATH=$PATH:/root/protoc/bin

刷新配置表

source /etc/profile

查看版本

protoc --version

Windows

打开https://github.com/protocolbuffers/protobuf/releases下载最新版本的protoc-*-win64.zip

新建一个文件夹并加压,然后把bin目录添加到环境变量即可

查看版本

protoc --version

安装protoc-gen-go


执行命令

go get -u github.com/golang/protobuf/protoc-gen-go

然后会在$GOPATH/bin目录下发现protoc-gen-go.exe

安装IDE插件


此步骤可选

在goland插件库中安装Protobuf Support

grpc流程


创建proto文件

创建pbfiles/Prod.proto文件,复制下面的代码保存

syntax="proto3";
option go_package = ".;services";
package services;

message ProdRequest{
    int32 prod_id=1;
}
message ProResponse{
    int32 pro_stock=1;
}

生成.pb.go文件

创建services目录然后在pbfiles目录下执行命令

protoc --go_out=../services Prod.proto

会得到services/Prod.pb.go文件

pbfiles/Prod.proto文件新增服务代码

service ProService{
    rpc GetProStock (ProdRequest) returns (ProResponse);
}

执行下面的命令

protoc --go_out=plugins=grpc:../services Prod.proto

services/Prod.pb.go文件会生成更多的代码

创建业务逻辑文件

在生成的.pb.go文件中找到GetProdStock的接口,然后复制,创建services/ProdService.go,然后实现GetProdStock方法的具体逻辑

package services

import (
    "context"
)

type ProdService struct {
}

func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest) (*ProdResponse, error) {
    return &ProdResponse{ProStock:20}, nil
}

创建服务端

创建server/server.go,并写入以下代码

package main

import (
    "google.golang.org/grpc"
    "grpc-test/services"
    "log"
    "net"
)

func main() {
    rpcServer := grpc.NewServer()
    services.RegisterProdServiceServer(rpcServer, new(services.ProdService))
    lis, err := net.Listen("tcp", ":8081")
    if err != nil {
        log.Fatal(err)
    }

    //tcp服务
    err = rpcServer.Serve(lis)
    if err != nil {
        log.Fatal(err)
    }
}

创建客户端

创建client/client.go,并写入以下代码

package main

import (
    "context"
    "fmt"
    "grpc-test/services"
    "log"
    "google.golang.org/grpc"
)

func main() {
    conn, err := grpc.Dial(":8081", grpc.WithInsecure())        //grpc.WithInsecure():不使用证书
    if err != nil {
        log.Fatalf("连接GRPC服务端失败 %v\n", err)
    }

    defer conn.Close()
    prodClient := services.NewProdServiceClient(conn)
    prodRes, err := prodClient.GetProdStock(context.Background(),
        &services.ProdRequest{ProdId: 12})

    if err != nil {
        log.Fatalf("请求GRPC服务端失败 %v\n", err)
    }
    fmt.Println(prodRes.ProStock)
}

启动服务

在命令行执行go run server/server.go,然后在另一个终端执行go run client/client.go即可

同时提供rpc和http服务

时提供rpchttp服务的grpc框架

https://github.com/grpc-ecosystem/grpc-gateway

第三方字段验证库

除了自行对参数字段进行验证,也可以选用第三方库验证字段

github.com/envoyproxy/protoc-gen-validate/validate

流模式


服务端流

User.proto

syntax = "proto3";

package services;

import "Model.proto";

message UserScoreRequest {
    repeated UserInfo users = 1;
}

message UserScoreResponse {
    repeated UserInfo users = 1;
}

service UserService {
    rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse) {}
    rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse) {}  //定义rpc服务
}

服务端UserService.go

package services
import context "context"

type UserService struct{}

func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
    var score int32 = 100
    users := make([]*UserInfo, 0)
    for _, user := range req.Users {
        user.UserScore = score
        score++
        users = append(users, user)
    }
    return &UserScoreResponse{Users: users}, nil
}

func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
    stream UserService_GetUserScoreByServerStreamServer) error {
    var score int32 = 100
    users := make([]*UserInfo, 0)
    for index, user := range req.Users {        //分批发送给客户端
        user.UserScore = score
        score++
        users = append(users, user)
        if (index+1)%2 == 0 && index > 0 {
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
            users = users[0:0]
        }

    }
    // 发送最后一批
    if len(users) > 0 {
        err := stream.Send(&UserScoreResponse{Users: users})
        if err != nil {
            return err
        }
    }
    return nil
}

客户端

package main

import (
    "context"
    "fmt"
    "io"
    "log"

    "github.com/zhuge20100104/grpc-demo/grpc-13/client/helper"

    "github.com/zhuge20100104/grpc-demo/grpc-13/client/services"

    "google.golang.org/grpc"
)

func main() {

    conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
    if err != nil {
        log.Fatalf("连接GRPC服务端失败 %v\n", err)
    }

    defer conn.Close()

    userClient := services.NewUserServiceClient(conn)

    users := make([]*services.UserInfo, 0)
    var i int32 = 0
    for i = 0; i < 6; i++ {
        user := &services.UserInfo{UserId: i + 1}
        users = append(users, user)
    }

    stream, err := userClient.GetUserScoreByServerStream(context.Background(),
        &services.UserScoreRequest{Users: users},
    )

    if err != nil {
        log.Fatalf("请求GRPC服务端失败 %v\n", err)
    }

    for {
        userRes, err := stream.Recv()   //读取流数据
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Printf("读取服务端流失败 err: %v\n", err.Error())
        }
        fmt.Println(userRes.Users)
    }
}

客户端流

服务端

package services

import (
    context "context"
    "io"
)

type UserService struct{}

func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
    var score int32 = 100
    users := make([]*UserInfo, 0)
    for _, user := range req.Users {
        user.UserScore = score
        score++
        users = append(users, user)
    }

    return &UserScoreResponse{Users: users}, nil
}

func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
    stream UserService_GetUserScoreByServerStreamServer) error {
    var score int32 = 100
    users := make([]*UserInfo, 0)
    for index, user := range req.Users {
        user.UserScore = score
        score++
        users = append(users, user)
        if (index+1)%2 == 0 && index > 0 {
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
            users = users[0:0]
        }

    }
    // 发送最后一批
    if len(users) > 0 {
        err := stream.Send(&UserScoreResponse{Users: users})
        if err != nil {
            return err
        }
    }

    return nil
}

func (*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error {
    users := make([]*UserInfo, 0)
    var score int32 = 100
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            err = stream.SendAndClose(&UserScoreResponse{Users: users})
            return err
        }

        if err != nil {
            return err
        }

        for _, user := range req.Users {
            user.UserScore = score
            users = append(users, user)
            score++
        }
    }
}

客户端

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/zhuge20100104/grpc-demo/grpc-14/client/helper"

    "github.com/zhuge20100104/grpc-demo/grpc-14/client/services"

    "google.golang.org/grpc"
)

func main() {

    conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
    if err != nil {
        log.Fatalf("连接GRPC服务端失败 %v\n", err)
    }

    defer conn.Close()

    userClient := services.NewUserServiceClient(conn)

    users := make([]*services.UserInfo, 0)
    var i int32 = 0
    for i = 0; i < 6; i++ {
        user := &services.UserInfo{UserId: i + 1}
        users = append(users, user)
    }

    stream, err := userClient.GetUserScoreByClientStream(context.Background())

    if err != nil {
        log.Fatalf("请求GRPC服务端失败 %v\n", err)
    }

    for i := 0; i < 3; i++ {
        req := new(services.UserScoreRequest)
        req.Users = make([]*services.UserInfo, 0)
        var j int32
        for j = 1; j <= 5; j++ {
            req.Users = append(req.Users, &services.UserInfo{UserId: j})
        }
        stream.Send(req)
    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("接收服务端请求失败 %v\n", err)
    }

    for _, user := range res.Users {
        fmt.Println(user)
    }

}

双向流

服务端UserService.go

package services

import (
    context "context"
    "io"
)

type UserService struct{}

func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
    var score int32 = 100
    users := make([]*UserInfo, 0)
    for _, user := range req.Users {
        user.UserScore = score
        score++
        users = append(users, user)
    }
    return &UserScoreResponse{Users: users}, nil
}

func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
    stream UserService_GetUserScoreByServerStreamServer) error {
    var score int32 = 100
    users := make([]*UserInfo, 0)
    for index, user := range req.Users {
        user.UserScore = score
        score++
        users = append(users, user)
        if (index+1)%2 == 0 && index > 0 {
            err := stream.Send(&UserScoreResponse{Users: users})
            if err != nil {
                return err
            }
            users = users[0:0]
        }
    }
    // 发送最后一批
    if len(users) > 0 {
        err := stream.Send(&UserScoreResponse{Users: users})
        if err != nil {
            return err
        }
    }
    return nil
}

func (*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error {
    users := make([]*UserInfo, 0)
    var score int32 = 100
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            err = stream.SendAndClose(&UserScoreResponse{Users: users})
            return err
        }

        if err != nil {
            return err
        }

        for _, user := range req.Users {
            user.UserScore = score
            users = append(users, user)
            score++
        }
    }
}

func (*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
    users := make([]*UserInfo, 0)
    var score int32 = 100
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            return nil
        }

        if err != nil {
            return err
        }

        for _, user := range req.Users {
            user.UserScore = score
            users = append(users, user)
            score++
        }

        stream.Send(&UserScoreResponse{Users: users})
        users = users[0:0]
    }
}

客户端

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "github.com/zhuge20100104/grpc-demo/grpc-15/client/helper"
    "github.com/zhuge20100104/grpc-demo/grpc-15/client/services"
    "google.golang.org/grpc"
)

func main() {
    conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
    if err != nil {
        log.Fatalf("连接GRPC服务端失败 %v\n", err)
    }

    defer conn.Close()

    userClient := services.NewUserServiceClient(conn)

    users := make([]*services.UserInfo, 0)
    var i int32 = 0
    for i = 0; i < 6; i++ {
        user := &services.UserInfo{UserId: i + 1}
        users = append(users, user)
    }

    stream, err := userClient.GetUserScoreByTWS(context.Background())

    if err != nil {
        log.Fatalf("请求GRPC服务端失败 %v\n", err)
    }

    for i := 0; i < 3; i++ {
        req := new(services.UserScoreRequest)
        req.Users = make([]*services.UserInfo, 0)
        var j int32
        for j = 1; j <= 5; j++ {
            req.Users = append(req.Users, &services.UserInfo{UserId: j})
        }
        stream.Send(req)

        res, err := stream.Recv()
        if err == io.EOF {
            break
        }

        if err != nil {
            log.Fatalf("接收服务端请求失败 %v\n", err)
        }
        fmt.Println(res.Users)
    }
}
上一篇下一篇

猜你喜欢

热点阅读