关于Grpc在项目中的使用

2018-05-03  本文已影响0人  xiaduobao

1 首先定义ClientInterceptor

public HeaderClientInterceptor(String clientId, String accessToken) {
        this.clientId = clientId;
        this.accessToken = accessToken;

    }

    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {

        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

            public void start(Listener<RespT> responseListener, Metadata headers) {
                headers.put(CLIENT_ID, clientId);

                if (StringUtils.isNotEmpty(accessToken)) {
                    headers.put(ACCESS_TOKEN, accessToken);
                }
                super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {

                    /**
                     * {@inheritDoc}
                     * @see io.grpc.ForwardingClientCallListener#onClose(Status, Metadata)
                     */
                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        CLOG.debug("关闭原因:{}", status.getCode().toString());
                        CLOG.debug("获取请求返回头部信息,{}", trailers.toString());
                        
                        String isexpire = trailers.get(IS_EXPIRE);
                        if (Boolean.getBoolean(isexpire)) {
                            OAuthService oAuthService = SpringContextUtils.getBean(OAuthService.class);
                            String accessToken = oAuthService.oauth();
                            CLOG.debug("refresh Token,{}", accessToken);
                        }
                        
                        super.onClose(status, trailers);
                    }

                    @Override
                    public void onHeaders(Metadata headers) {
                        CLOG.debug("获取请求返回头部信息,{}", headers.toString());
                        super.onHeaders(headers);
                    }

                }, headers);
            }

        };

    }

2 Auth,项目中做成了切面。

   public String verifying() {
        ManagedChannel channel = channelPool.getManagedChannel();
        OAuthBlockingStub blockingStub = OAuthGrpc.newBlockingStub(channel)
                .withInterceptors(new HeaderClientInterceptor(clientProp.getClientId(), null));

        OAuthRequest oAuthRequest = OAuthRequest.newBuilder().setClientId(clientProp.getClientId())
                .setClientSecret(clientProp.getSecret()).setGrantType(clientProp.getGrant()).build();
        CLOG.info("OAuthRequest: " + JsonUtils.toJson(oAuthRequest), tagMap);
        OAuthResponse oauthResponse = null;
        try {
            oauthResponse = blockingStub.getAccessToken(oAuthRequest);
            RedisHelper.setHNATokenAndEffectTime(oauthResponse.getAccessToken(), new Date());
        } catch (StatusRuntimeException e) {
            CLOG.info(e, tagMap);
            return "";
        } finally {
            if (null != channel) {
                channelPool.returnObject(channel);
            }
        }
        return oauthResponse.getAccessToken();
    }

3 具体clientStub调用

  public AirLowFareSearchRS airLowFareSearch(AirLowFareSearchRQ airLowFareSearchRQ)  {

        ManagedChannel channel = channelPool.getManagedChannel();

        Map<String, String> tagMap = this.tagMap(airLowFareSearchRQ);

        AirLowFareSearchBlockingStub blockingStub = AirLowFareSearchGrpc.newBlockingStub(channel)
                .withInterceptors(new HeaderClientInterceptor(clientProp.getClientId(), RedisHelper.getToken()));

        CLOG.info("AirLowFareSearchRQ :" + JsonUtils.toJson(airLowFareSearchRQ), tagMap);
        AirLowFareSearchRS airLowFareSearchRS = null;
        try {
            airLowFareSearchRS = blockingStub.airLowFareSearch(airLowFareSearchRQ);
            CLOG.info("AirLowFareSearchRS :" + JsonUtils.toJson(airLowFareSearchRS), tagMap);
        } catch (Exception e) {
            CLOG.info(e, tagMap);
            ExceptionHandler.tokenUselessToEmpty(e);
        } finally {
            if (null != channel) {
                channelPool.returnObject(channel);
            }
        }

        return airLowFareSearchRS;
    }

4 深入源码 (调了半天有点晕,下次再续)

/**
   * Executes a unary call and returns a {@link ListenableFuture} to the response.
   *
   * @return a future for the single response message.
   */
  public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
      ClientCall<ReqT, RespT> call,
      ReqT param) {
    GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
    asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture), false);
    return responseFuture;
  }
上一篇 下一篇

猜你喜欢

热点阅读