js css html

GRPC Interceptor的使用

2022-06-19  本文已影响0人  申_9a33

Interceptor 使用

上一篇我们介绍了metadata的使用方法,但是我们在每个方法内部都需要设置相同重复的metadata,比如调用时间戳,调用链等;能不能把这些相同的重复性设置,统一放在一个地方,方便后面修改和维护,答案就是拦截器-Interceptor.

1.普通调用 Interceptor的使用

1.1 服务端修改后代码

func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    fmt.Println("---unaryInterceptor---")

    // 解析请求携带的信息
    str, _ := json.Marshal(req)
    fmt.Printf("req: %s\n", str)
    fmt.Printf("Method: %s\n", info.FullMethod)

    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        grpc.SetTrailer(ctx, trailer)
    }()

    // 解析请求的metadata
    md, ok := metadata.FromIncomingContext(ctx)

    if !ok {
        return nil, status.Errorf(codes.DataLoss, "无法获取元数据")
    }

    if t, ok := md["timestamp"]; ok {
        fmt.Println("timestamp from metadata:")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    }

    // 创建携带metadata的Header
    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    grpc.SendHeader(ctx, header)

    // 方法调用
    m, err := handler(ctx, req)
    if err != nil {
        fmt.Printf("RPC failed with error %v", err)
    }
    return m, err
}
func (s *server) UnaryEcho(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Println("---UnaryEcho---")

    fmt.Printf("已接受到的请求:%v,发送响应\n", in)

    return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}

1.2客户端修改后代码

// unaryInterceptor is an example unary interceptor.
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    fmt.Printf("---unaryInterceptor---\n")

    // 创建metadata到context中.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx = metadata.NewOutgoingContext(ctx, md)

    reqStr, _ := json.Marshal(req)
    fmt.Printf("RPC: %s,req:%s\n", method, reqStr)

    var header, trailer metadata.MD
    opts = append(opts, grpc.Header(&header), grpc.Trailer(&trailer))

    err := invoker(ctx, method, req, reply, cc, opts...)

    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要location,但是header中不存在location")
    }

    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

    return err
}
func unaryCallWithMetadata(c pb.GreeterClient) {
    fmt.Println("--- unaryCall ---")

    // 使用metadata的上下文创建RPC

    r, err := c.UnaryEcho(context.Background(), &pb.HelloRequest{Name: "unaryCall"})
    if err != nil {
        log.Fatalf("调用UnaryEcho失败:%v", err)
    }

    fmt.Println("response:")
    fmt.Printf(" - %s\n", r.Message)
}

总结


2.stream调用 Interceptor的使用

2.1 服务端修改后代码

拦截器代码

func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    fmt.Printf("--- streamInterceptor ---\n")

    // 调用完成时设置SetTrailer
    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        ss.SetTrailer(trailer)
    }()

    // 从Stream的Context中解析出metadata
    md, ok := metadata.FromIncomingContext(ss.Context())
    if !ok {
        return status.Errorf(codes.DataLoss, "ServerStreamingEcho: 无法获取metadata")
    }
    if t, ok := md["timestamp"]; ok {
        fmt.Printf("timestamp from metadata:\n")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    }

    // 设置Header里面的metadata
    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    ss.SendHeader(header)

    err := handler(srv, ss)
    if err != nil {
        fmt.Printf("RPC failed with error %v", err)
    }
    return err
}

服务端双向流Handle代码

func (s *server) BidirectionalStreamingEcho(stream pb.Greeter_BidirectionalStreamingEchoServer) error {
    fmt.Printf("--- BidirectionalStreamingEcho ---\n")

    // Read requests and send responses.
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        fmt.Printf("request received %v, sending echo\n", in)
        if err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()}); err != nil {
            return err
        }
    }
}

2.2 客户端修改后代码

客户端拦截器代码

func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    fmt.Printf("---streamInterceptor---\n")

    // 创建metadata到context中.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx = metadata.NewOutgoingContext(ctx, md)

    // 执行具体业务
    s, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }

    // 解析 header
    header, err := s.Header()
    if err != nil {
        log.Fatalf("无法从stream中获取header: %v", err)
    }

    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }
    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要location,但是header中不存在location")
    }

    return s, nil
}

客户端双向流Handle代码

func bidirectionalWithMetadata(c pb.GreeterClient) {
    fmt.Printf("--- bidirectional ---\n")

    // Make RPC using the context with the metadata.
    stream, err := c.BidirectionalStreamingEcho(context.Background())
    if err != nil {
        log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
    }

    go func() {
        // Send all requests to the server.
        for i := 0; i < 10; i++ {
            if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
                log.Fatalf("failed to send streaming: %v\n", err)
            }
        }
        stream.CloseSend()
    }()

    // Read all the responses.
    var rpcStatus error
    fmt.Printf("response:\n")
    for {
        r, err := stream.Recv()
        if err != nil {
            rpcStatus = err
            break
        }
        fmt.Printf(" - %s\n", r.Message)
    }
    if rpcStatus != io.EOF {
        log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    }

    // 解析trailer
    trailer := stream.Trailer()

    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }
}

源码

上一篇下一篇

猜你喜欢

热点阅读