squbsScala In Action

squbs-5.Akka HTTP客户端上的类固醇(Steroi

2017-01-23  本文已影响193人  吕亦行

原文地址:Accessing Other Services using HTTP or HTTPS

概貌

squbs-httpclient 项目在保持Akka Http API的同时,向Akka HTTP Host-Level Client-Side API添加了可操作化方面,以下是它添加的功能的列表

依赖

在你的build.sbt或scala构建文件中加入如下依赖:

"org.squbs" %% "squbs-httpclient" % squbsVersion

用法

squbs-httpclient项目坚持 Akka HTTP API。唯一的例外时在创建主机连接池期间。代替 Http().cachedHostConnectionPool,它使用一组参数定义 ClientFlow(和一些可选参数)。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// construct a pool client flow with context type `Int`
val poolClientFlow = ClientFlow[Int]("sample") // Only this line is specific to squbs

val responseFuture: Future[(Try[HttpResponse], Int)] =
  Source.single(HttpRequest(uri = "/") -> 42)
    .via(poolClientFlow)
    .runWith(Sink.head)

同时,类似在Akka HTTP Host-Level Client-Side API中的例子,ClientFlow在JAVA中的使用如下:

final ActorSystem system = ActorSystem.create();
final ActorMaterializer mat = ActorMaterializer.create(system);

final Flow<Pair<HttpRequest, Integer>, Pair<Try<HttpResponse>, Integer>, HostConnectionPool>
    clientFlow = ClientFlow.create("sample", system, mat);

CompletionStage<Pair<Try<HttpResponse>, Integer>> =
    Source
        .single(Pair.create(request, 42))
        .via(clientFlow)
        .runWith(Sink.head(), mat);

HTTP 模型

Scala

以下是一个HttpRequest Scala的创建例子,更多请查阅 HTTP Model Scala documentation

import HttpProtocols._
import MediaTypes._
import HttpCharsets._
val userData = ByteString("abc")
val authorization = headers.Authorization(BasicHttpCredentials("user", "pass"))

HttpRequest(
  PUT,
  uri = "/user",
  entity = HttpEntity(`text/plain` withCharset `UTF-8`, userData),
  headers = List(authorization),
  protocol = `HTTP/1.0`)
Java

以下是一个HttpRequest Java的创建例子,更多请查看Http Model Java documentation :

import HttpProtocols.*;
import MediaTypes.*;

Authorization authorization = Authorization.basic("user", "pass");
HttpRequest complexRequest =
    HttpRequest.PUT("/user")
        .withEntity(HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, "abc"))
        .addHeader(authorization)
        .withProtocol(HttpProtocols.HTTP_1_0);

服务发现链

在创建池的时候,squbs-httpclient不需要提供主机名/端口的组合。取而代之的是,它允许注册服务发现链,通过注册服务发现机制,允许通过string识别符来解析端点(endpoint)。举个例子,在上面的例子中, "sample"是客户端想要访问的服务的逻辑名称,配置的服务发现链会将它解析成一个包含主机名和端口的端点(endpoint),例如: http://akka.io:80.。

这里有两个不同的注册端点解释器如下。闭包的风格允许更加紧凑和可读的代码。然而,这个子类拥有保持状态和基于这个状态做解决决定的能力。

Scala

注册函数类型 (String, Env) => Option[Endpoint]:

EndpointResolverRegistry(system).register("SampleEndpointResolver", { (svcName, env) =>
  svcName match {
    case "sample" => Some(Endpoint("http://akka.io:80"))
    case "google" => Some(Endpoint("http://www.google.com:80"))
    case _ => None
})

注册类继承于 EndpointResolver:

class SampleEndpointResolver extends EndpointResolver {
  override def name: String = "SampleEndpointResolver"

  override def resolve(svcName: String, env: Environment): Option[Endpoint] = svcName match {
    case "sample" => Some(Endpoint("http://akka.io:80"))
    case "google" => Some(Endpoint("http://www.google.com:80"))
    case _ => None
  }
}

// Register EndpointResolver
EndpointResolverRegistry(system).register(new SampleEndpointResolver)
Java

注册BiFunction<String, Env, Optional<Endpoint>>:

EndpointResolverRegistry.get(system).register("SampleEndpointResolver", (svcName, env) -> {
    if ("sample".equals(svcName)) {
        return Optional.of(Endpoint.create("http://akka.io:80"));
    } else if ("google".equals(svcName))
        return Optional.of(Endpoint.create("http://www.google.com:80"));
    } else {
        return Optional.empty();
    }
});

注册类继承AbstractEndpointResolver:

class SampleEndpointResolver extends AbstractEndpointResolver {
    String name() {
        return "SampleEndpointResolver";
    }

    Optional<Endpoint> resolve(svcName: String, env: Environment) { 
        if ("sample".equals(svcName)) {
        return Optional.of(Endpoint.create("http://akka.io:80"));
    } else if ("google".equals(svcName))
        return Optional.of(Endpoint.create("http://www.google.com:80"));
    } else {
        return Optional.empty();
    }
}

// Register EndpointResolver
EndpointResolverRegistry.get(system).register(new SampleEndpointResolver());

你可以注册多个EndpointResolver。这个链是根据注册的逆序执行。如果一个解析器返回None,这就意味着它无法解释,下一个解释器尝试解释。
如果解释的端点(endpoint)是安全的,例如(https),可以将SSLContext传递给Endpoint

每个客户端的配置(Per Client Configuration)

Akka HTTP Configuration 定义了配置的默认值。你可以在application.conf中重写这些默认值;然而这样会影响所有的客户端。做一个特定客户端的重写,在HostConnectionPool流创建时,Akka HTTP允许在HostConnectionPool流创建期间传递ConnectionPoolSettings。这也是由squbs支持。

除了以上之外,squbs还允许客户端特定重写 application.conf。你仅需要通过客户端名称提出一个有 type = squbs.httpclient的配置部分。然后,你可以在这个部分提出任何的客户端配置。例如,如果我们只想要重写上面"sample"客户端中的 max-connections 设置,我们可以如下做:

sample {
  type = squbs.httpclient
  
  akka.http.host-connection-pool {
    max-connections = 10
  }
}

管道(Pipeline)

我们通常需要公用的基础功能或不同客户端的组织标准。这些基础设施包括而不限于,日志、指标手机、请求跟踪、认证\授权,跟踪,cookie管理,A/B测试等等。正因squbs促成关注分离,这样的逻辑将属于基础设置,而非客户端实现。 squbs pipeline 允许基础设施提供组件,安装在客户端,客户端所有者不需要担心这些方面。更多详细可查看 squbs pipeline

一般来说,一个pipeline是一个双向流,构建了squbs客户端和Akka HTTP 层的桥梁。squbs-httpclient允许为所有或个别的客户端注册全局双向流(默认 pipeline)。注册一个特定客户端pipeline,设置 pipeline配置。你可以通过defaultPipeline设置开启/关闭默认pipeline(如果未指定,则设置为on)。

sample {
  type = squbs.httpclient
  pipeline = metricsFlow
  defaultPipeline = on
}

请参照 squbs pipeline来查看如何创建pipeline和配置默认pipeline

指标(Metrics)

squbs带有预编译pipeline指标集,squbs激活模板将这些设为默认。因此,不需要任何代码变更或配置,每个squbs http客户端可以收集Codahale Metrics即开即用。默认情况下,JMX提供以下指标:

JMX Beans

故障排除问题时,可视化系统配置是非常重要的。squbs-httpclient为每个客户端注册了JMX bean。JMX bean发布所有的配置,例如端点(endpoint),主机连接池设置等。bean的名称设置如 org.squbs.configuration.${system.name}:type=squbs.httpclient,name=$name。所以,如果actor系统名称是 squbs ,客户端名称为 sample,那么JMX bean的名称为org.squbs.configuration.squbs:type=squbs.httpclient,name=sample

Circuit Breaker

// TODO In progress

上一篇下一篇

猜你喜欢

热点阅读