Flink是一款新的大数据处理引擎,目标是统一不同来源的数据处理flinkflink

Flink写入数据到ElasticSearch

2019-02-11  本文已影响2人  it_zzy

Flink写入数据到ElasticSearch

前言

我们知道flink自带了很多连接器Connector,,今天我们就用Elasticsearch Connector作为sink将数据写入到Elasticsearch(以下简称es)。
Elasticsearch Connector

es安装略,可以参考网上文章或者我之前写过的文章

添加依赖

可以看到flink和es依赖关系如下:

Maven依赖 支持自 Elasticsearch版本
flink-connector-elasticsearch_2.11 1.0.0 1.x
flink-connector-elasticsearch2_2.11 1.0.0 2.x
flink-connector-elasticsearch5_2.11 1.3.0 5.x
flink-connector-elasticsearch6_2.11 1.6.0 6 and later versions

Elasticsearch5.x

因为之前用过的es是5.6.X,首先加入的maven依赖是5.x-flink-connector-elasticsearch5_2.11

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

但是运行写入es的程序报如下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
    at com.hfjy.bigdata.flink.sink.FlinkSinkToES.main(FlinkSinkToES.java:81)
Caused by: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes!
    at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:81)
    at org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:48)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:296)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

对于上面的报错检查了es启动是否正常,检查了代码里host是否正确,也debug了代码,但是还未找到具体的原因,代码如下:

public class FlinkSinkToES {
    private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES.class);

    private static final String READ_TOPIC = "student";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group-1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                //这个 kafka topic 需要和上面的工具类的 topic 一致
                READ_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);
//                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
        Map<String, String> config = new HashMap<>();
        config.put("cluster.name", "elasticsearch");
// This instructs the sink to emit after every element, otherwise they would be buffered
        config.put("bulk.flush.max.actions", "1");
//        config.put("auth_user","elastic");
//        config.put("auth_password","changeme");

        List<InetSocketAddress> transportAddresses = new ArrayList<>();
        transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
//        transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

        student.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
            public IndexRequest createIndexRequest(String element) {
                Map<String, String> json = new HashMap<>();
                json.put("data", element);
                log.info("data:" + element);
                return Requests.indexRequest()
                        .index("my-index-student-0211")
                        .type("my-type")
                        .source(json);
            }

            @Override
            public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                indexer.add(createIndexRequest(element));
            }
        }));

        env.execute("flink learning connectors kafka");
    }
}

Elasticsearch6.x

写入es5.x报上面的错,但是还未找到具体的原因,所以决定换做es6

添加flink-es6的maven依赖

<!--es6-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

安装es6之后启动报了如下错误:

➜  elasticsearch-6.6.0 ./bin/elasticsearch
[2019-02-11T14:53:02,255][WARN ][o.e.b.ElasticsearchUncaughtExceptionHandler] [unknown] uncaught exception in thread [main]
org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
    at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:163) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:150) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:124) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
    at org.elasticsearch.cli.Command.main(Command.java:90) ~[elasticsearch-cli-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:116) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:93) ~[elasticsearch-6.6.0.jar:6.6.0]
Caused by: java.lang.IllegalStateException: failed to obtain node locks, tried [[/data/es-6/data]] with lock id [0]; maybe these locations are not writable or multiple nodes were started without increasing [node.max_local_storage_nodes] (was [1])?
    at org.elasticsearch.env.NodeEnvironment.<init>(NodeEnvironment.java:297) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.node.Node.<init>(Node.java:295) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.node.Node.<init>(Node.java:265) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Bootstrap$5.<init>(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Bootstrap.setup(Bootstrap.java:212) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Bootstrap.init(Bootstrap.java:333) ~[elasticsearch-6.6.0.jar:6.6.0]
    at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:159) ~[elasticsearch-6.6.0.jar:6.6.0]
    ... 6 more

查了下应该是已经启动了es,如果再次启动es会报上面这个错误,找到es的进程kill掉,重新启动
参考: [https://blog.csdn.net/qq_38977441/article/details/80406126]

➜  elasticsearch-6.6.0 ps -ef | grep elastic
  501 94853 93152   0  2:37PM ttys011    0:35.04 /Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home//bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djna.nosys=true -XX:-OmitStackTraceInFastThrow -Dio.netty.noUnsafe=true -Dio.netty.noKeySetOptimization=true -Dio.netty.recycler.maxCapacityPerThread=0 -Dlog4j.shutdownHookEnabled=false -Dlog4j2.disable.jmx=true -Djava.io.tmpdir=/var/folders/t6/v40m6tfx1x1b1_2lg2078tf80000gn/T/elasticsearch-465341793791663807 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=data -XX:ErrorFile=logs/hs_err_pid%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Xloggc:logs/gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=32 -XX:GCLogFileSize=64m -Des.path.home=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0 -Des.path.conf=/Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/config -Des.distribution.flavor=default -Des.distribution.type=tar -cp /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/lib/* org.elasticsearch.bootstrap.Elasticsearch
  501 94869 94853   0  2:37PM ttys011    0:00.03 /Users/zzy/Documents/zzy/software/elasticsearch-6.6.0/modules/x-pack-ml/platform/darwin-x86_64/bin/controller
  501 95090 93152   0  2:55PM ttys011    0:00.00 grep --color=auto elastic

安装kibana,启动报如下错误:

➜  kibana-6.6.0-linux-x86_64 ./bin/kibana
./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: cannot execute binary file
./bin/kibana: line 24: /Users/zzy/Documents/zzy/software/kibana-6.6.0-linux-x86_64/bin/../node/bin/node: Undefined error: 0

可能是版本问题,安装的linux的,换做mac后,再次启动kibana,启动日志如下:

➜  kibana-6.6.0-darwin-x86_64 ./bin/kibana
  log   [08:02:11.812] [warning][plugin] Skipping non-plugin directory at /Users/zzy/Documents/zzy/software/kibana-6.6.0-darwin-x86_64/src/legacy/core_plugins/ems_util
  log   [08:02:12.877] [info][status][plugin:kibana@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:12.910] [info][status][plugin:elasticsearch@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.912] [info][status][plugin:xpack_main@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.916] [info][status][plugin:graph@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.931] [info][status][plugin:monitoring@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:12.934] [info][status][plugin:spaces@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.940] [warning][security] Generating a random key for xpack.security.encryptionKey. To prevent sessions from being invalidated on restart, please set xpack.security.encryptionKey in kibana.yml
  log   [08:02:12.944] [warning][security] Session cookies will be transmitted over insecure connections. This is not recommended.
  log   [08:02:12.949] [info][status][plugin:security@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.966] [info][status][plugin:searchprofiler@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.968] [info][status][plugin:ml@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.997] [info][status][plugin:tilemap@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:12.999] [info][status][plugin:watcher@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.007] [info][status][plugin:grokdebugger@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.011] [info][status][plugin:dashboard_mode@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.018] [info][status][plugin:logstash@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.022] [info][status][plugin:beats_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.038] [info][status][plugin:apm@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.173] [info][status][plugin:interpreter@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.180] [info][status][plugin:canvas@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.184] [info][status][plugin:license_management@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.188] [info][status][plugin:index_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.207] [info][status][plugin:console@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.210] [info][status][plugin:console_extensions@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.213] [info][status][plugin:notifications@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.215] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.252] [info][status][plugin:infra@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.254] [info][status][plugin:rollup@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.263] [info][status][plugin:remote_clusters@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.268] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:13.274] [info][status][plugin:upgrade_assistant@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.281] [info][status][plugin:metrics@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:13.444] [info][status][plugin:timelion@6.6.0] Status changed from uninitialized to green - Ready
  log   [08:02:14.218] [warning][reporting] Generating a random key for xpack.reporting.encryptionKey. To prevent pending reports from failing on restart, please set xpack.reporting.encryptionKey in kibana.yml
  log   [08:02:14.223] [info][status][plugin:reporting@6.6.0] Status changed from uninitialized to yellow - Waiting for Elasticsearch
  log   [08:02:14.228] [info][status][plugin:elasticsearch@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.352] [info][license][xpack] Imported license information from Elasticsearch for the [data] cluster: mode: basic | status: active
  log   [08:02:14.355] [info][status][plugin:xpack_main@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.356] [info][status][plugin:graph@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.357] [info][status][plugin:searchprofiler@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.357] [info][status][plugin:ml@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.357] [info][status][plugin:tilemap@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.358] [info][status][plugin:watcher@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.358] [info][status][plugin:grokdebugger@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.358] [info][status][plugin:logstash@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.359] [info][status][plugin:beats_management@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.359] [info][status][plugin:index_management@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.359] [info][status][plugin:index_lifecycle_management@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:rollup@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:remote_clusters@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:cross_cluster_replication@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.360] [info][status][plugin:reporting@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.361] [info][kibana-monitoring][monitoring-ui] Starting monitoring stats collection
  log   [08:02:14.369] [info][status][plugin:security@6.6.0] Status changed from yellow to green - Ready
  log   [08:02:14.431] [info][license][xpack] Imported license information from Elasticsearch for the [monitoring] cluster: mode: basic | status: active
  log   [08:02:15.707] [info][migrations] Creating index .kibana_1.
  log   [08:02:16.113] [info][migrations] Pointing alias .kibana to .kibana_1.
  log   [08:02:16.158] [info][migrations] Finished in 451ms.
  log   [08:02:16.159] [info][listening] Server running at http://127.0.0.1:5602
  log   [08:02:16.361] [info][status][plugin:spaces@6.6.0] Status changed from yellow to green - Ready

查看索引信息

GET _cat/indices?v

health status index     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_1 8nwhLfRcTmWOeNp5UTQGOQ   1   0          3            0     11.7kb         11.7kb

执行flink写es6的程序后,开始是没有索引index-student的数据的,需要加上如下代码:

esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

-- RetryRejectedExecutionFailureHandler 来自flink的包package org.apache.flink.streaming.connectors.elasticsearch.util;

注意和官网的区别

builder.setRestClientFactory(
  restClientBuilder -> {
    restClientBuilder.setDefaultHeaders(...)
    restClientBuilder.setMaxRetryTimeoutMillis(...)
    restClientBuilder.setPathPrefix(...)
    restClientBuilder.setHttpClientConfigCallback(...)
  }
);

Elasticsearch Connector

public class RestClientFactoryImpl implements RestClientFactory {
    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        Header[] headers = new BasicHeader[]{new BasicHeader("Content-Type","application/json")};
        restClientBuilder.setDefaultHeaders(headers); //以数组的形式可以添加多个header
        restClientBuilder.setMaxRetryTimeoutMillis(90000);
    }
}

再次通过kibana查看索引

GET _cat/indices?v

health status index         uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   index-student VUUIiS2fQX2p5-JyzxEa7A   5   1        300            0    152.4kb        152.4kb
green  open   .kibana_1     8nwhLfRcTmWOeNp5UTQGOQ   1   0          3            0     11.9kb         11.9kb

看到是有index-student这个索引的,说明flink写入es成功,查看索引数据

GET /index-student/_search?pretty

{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 300,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "3cy_22gBPPMO6TTdKysy",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":123,"id":105,"name":"itzzy105","password":"password105"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "3sy_22gBPPMO6TTdKytx",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":130,"id":112,"name":"itzzy112","password":"password112"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "5sy_22gBPPMO6TTdKyt7",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":138,"id":120,"name":"itzzy120","password":"password120"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "7cy_22gBPPMO6TTdKyuA",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":142,"id":124,"name":"itzzy124","password":"password124"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "9sy_22gBPPMO6TTdKyuK",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":151,"id":133,"name":"itzzy133","password":"password133"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "-My_22gBPPMO6TTdKyuK",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":156,"id":138,"name":"itzzy138","password":"password138"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "-8y_22gBPPMO6TTdKyuP",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":155,"id":137,"name":"itzzy137","password":"password137"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "_sy_22gBPPMO6TTdKyuS",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":162,"id":144,"name":"itzzy144","password":"password144"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "AMy_22gBPPMO6TTdKyyS",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":159,"id":141,"name":"itzzy141","password":"password141"}"""
        }
      },
      {
        "_index" : "index-student",
        "_type" : "student",
        "_id" : "Acy_22gBPPMO6TTdKyyU",
        "_score" : 1.0,
        "_source" : {
          "data" : """{"age":160,"id":142,"name":"itzzy142","password":"password142"}"""
        }
      }
    ]
  }
}

附上flink写es的代码

public class FlinkSinkToES6 {

    private static final Logger log = LoggerFactory.getLogger(FlinkSinkToES6.class);

    private static final String READ_TOPIC = "student-1";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group-1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                //这个 kafka topic 需要和上面的工具类的 topic 一致
                READ_TOPIC,
                new SimpleStringSchema(),
                props)).setParallelism(1);
//                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 对象
        student.print();
        log.info("student:" + student);
        List<HttpHost> esHttphost = new ArrayList<>();
        esHttphost.add(new HttpHost("127.0.0.1", 9200, "http"));

        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                esHttphost,
                new ElasticsearchSinkFunction<String>() {

                    public IndexRequest createIndexRequest(String element) {
                        Map<String, String> json = new HashMap<>();
                        json.put("data", element);
                        log.info("data:" + element);

                        return Requests.indexRequest()
                                .index("index-student")
                                .type("student")
                                .source(json);
                    }

                    @Override
                    public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );

        esSinkBuilder.setBulkFlushMaxActions(1);
//        esSinkBuilder.setRestClientFactory(
//                restClientBuilder -> {
//                    restClientBuilder.setDefaultHeaders()
//                }
//        );
        esSinkBuilder.setRestClientFactory(new RestClientFactoryImpl());
        esSinkBuilder.setFailureHandler(new RetryRejectedExecutionFailureHandler());

        student.addSink(esSinkBuilder.build());
        env.execute("flink learning connectors kafka");
    }
}

kafka生产者代码

public class KafkaUtils {
    private static final String broker_list = "localhost:9092";
    private static final String topic = "student-1";  //kafka topic 需要和 flink 程序用同一个 topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已废弃
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        try {
            for (int i = 1; i <= 100; i++) {
                Student student = new Student(i, "itzzy" + i, "password" + i, 18 + i);
                ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
                producer.send(record);
                System.out.println("发送数据: " + JSON.toJSONString(student));
            }
            Thread.sleep(3000);
        }catch (Exception e){

        }

        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {

    private int id;
    private String name;
    private String password;
    private int age;

}

参考:

上一篇下一篇

猜你喜欢

热点阅读