聊聊flink的RestClientConfiguration

2019-03-07  本文已影响5人  go4it

本文主要研究一下flink的RestClientConfiguration

RestClientConfiguration

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java

public final class RestClientConfiguration {

    @Nullable
    private final SSLHandlerFactory sslHandlerFactory;

    private final long connectionTimeout;

    private final long idlenessTimeout;

    private final int maxContentLength;

    private RestClientConfiguration(
            @Nullable final SSLHandlerFactory sslHandlerFactory,
            final long connectionTimeout,
            final long idlenessTimeout,
            final int maxContentLength) {
        checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
        this.sslHandlerFactory = sslHandlerFactory;
        this.connectionTimeout = connectionTimeout;
        this.idlenessTimeout = idlenessTimeout;
        this.maxContentLength = maxContentLength;
    }

    /**
     * Returns the {@link SSLEngine} that the REST client endpoint should use.
     *
     * @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
     */
    @Nullable
    public SSLHandlerFactory getSslHandlerFactory() {
        return sslHandlerFactory;
    }

    /**
     * {@see RestOptions#CONNECTION_TIMEOUT}.
     */
    public long getConnectionTimeout() {
        return connectionTimeout;
    }

    /**
     * {@see RestOptions#IDLENESS_TIMEOUT}.
     */
    public long getIdlenessTimeout() {
        return idlenessTimeout;
    }

    /**
     * Returns the max content length that the REST client endpoint could handle.
     *
     * @return max content length that the REST client endpoint could handle
     */
    public int getMaxContentLength() {
        return maxContentLength;
    }

    /**
     * Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
     *
     * @param config configuration from which the REST client endpoint configuration should be created from
     * @return REST client endpoint configuration
     * @throws ConfigurationException if SSL was configured incorrectly
     */

    public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
        Preconditions.checkNotNull(config);

        final SSLHandlerFactory sslHandlerFactory;
        if (SSLUtils.isRestSSLEnabled(config)) {
            try {
                sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
            } catch (Exception e) {
                throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
            }
        } else {
            sslHandlerFactory = null;
        }

        final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);

        final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);

        int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);

        return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
    }
}

RestClient

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java

public class RestClient implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);

    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();

    // used to open connections to a rest server endpoint
    private final Executor executor;

    private final Bootstrap bootstrap;

    private final CompletableFuture<Void> terminationFuture;

    private final AtomicBoolean isRunning = new AtomicBoolean(true);

    public RestClient(RestClientConfiguration configuration, Executor executor) {
        Preconditions.checkNotNull(configuration);
        this.executor = Preconditions.checkNotNull(executor);
        this.terminationFuture = new CompletableFuture<>();

        final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
        ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) {
                try {
                    // SSL should be the first handler in the pipeline
                    if (sslHandlerFactory != null) {
                        socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
                    }

                    socketChannel.pipeline()
                        .addLast(new HttpClientCodec())
                        .addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
                        .addLast(new ChunkedWriteHandler()) // required for multipart-requests
                        .addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
                        .addLast(new ClientHandler());
                } catch (Throwable t) {
                    t.printStackTrace();
                    ExceptionUtils.rethrow(t);
                }
            }
        };
        NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));

        bootstrap = new Bootstrap();
        bootstrap
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
            .group(group)
            .channel(NioSocketChannel.class)
            .handler(initializer);

        LOG.info("Rest client endpoint started.");
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        return shutdownInternally(Time.seconds(10L));
    }

    public void shutdown(Time timeout) {
        final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);

        try {
            shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        } catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", e);
        }
    }

    private CompletableFuture<Void> shutdownInternally(Time timeout) {
        if (isRunning.compareAndSet(true, false)) {
            LOG.info("Shutting down rest endpoint.");

            if (bootstrap != null) {
                if (bootstrap.group() != null) {
                    bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
                        .addListener(finished -> {
                            if (finished.isSuccess()) {
                                terminationFuture.complete(null);
                            } else {
                                terminationFuture.completeExceptionally(finished.cause());
                            }
                        });
                }
            }
        }
        return terminationFuture;
    }

    //......
}

小结

doc

上一篇下一篇

猜你喜欢

热点阅读