Chapter 2 Data Processing Using
Connectors
Apache Flink supports various connectors that allow data read/writes across various technologies. Let's learn more about this
Apache Flink提供各种连接器,这些连接器允许通过各种不同的技术读写数据。下面我们学习这些连接器的更多细节。
Kafka connector
Kafka is a publish-subscribe, distributed, message queuing system that allows users to publish messages to a certain topic; this is then distributed to the subscribers of the topic. Flink provides options to define a Kafka consumer as a data source in Flink Streaming. In order to use the Flink Kafka connector, we need to use a specific JAR file.
Kafka 是一个发布/订阅,分布式的消息队列系统,它允许用户发现消息给特定的topic
;然后将消息分发给topics
的订阅者。在Flink Streaming
中,Flink提供定义Kafka消息者作为source
的选项。为了使用 Flink kafka connector
,我 们需要使用指定的JAR
包。
The following diagram shows how the Flink Kafka conector works:
(下面显示了Flink Kafka
连接器的工作原理)
We need to use the following Maven dependency to use the connector. I have been using Kafka version 0.9 so I will be adding the following dependency in pom.xml:
我们需要使用下面的Maven
依赖来使用connector
。我们已经使用了0.9
版本的Kafka,所以我们需要在pom.xml
文件中加下以下依赖
<denendency>
<groupId>org.apache.flink<groupid>
<cartifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.4</version>
</dependency>
Now let's try to understand how to use the Kafka consumer as the Kafka source:
现在,我们试着去理解怎么使用Kafka
的消费端作为Kafka
的源。
In Java:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhpat: 9092");
properties.setProperty ("group.id", "test");
DataStream input = env.addSource (new
FlinKafkaConsumer09<String> ("mytopic", new SimpleStringSchema ().properties));
In Scala:
val properties = new Properties ();
properties.setProperty ("bootstrap.servers", "localhoat: 9092");
//only required for Kafka 0.8
properties.setProperty("zookeeper.connect". "localhoat:2181");
propertiea.setProperty ("group.id", "test");
stream = env
.addSource (new FlinkKafkaConsumer09 [String] ("mytopic",
new SimpleStringSchema(), properties)) .print
In the preceding code, we first set the properties of the Kafka host and the zookeeper host and port. Next we need to specify the topic name, in this case mytopic. So if any messages get published to the mytopic topic, they will be processed by the Flink streams
在上面的代码中,我们首先设置了Kafka
的host
和zookeeper
的host
和port
。下面我们需要指定topic
名,在这个场景中,topic
名为mytopic
。所以任何发送到mytopic
主题的消息都会被Flink stream
处理。
If you get data in a different format, then you can also specify your custom schema for deserialization. By default, Flink supports string and JSON deserializers.
如果你以不同的格式获取数据,那么你可以为反序列化指定自定义的schema
。默认情况下,Flink 支持String
和JSON
的反序列化器。
In order to enable fault tolerance, we need to enable checkpointing in Flink. Flink is keen on taking snapshots of the state in a periodic manner. In the case of failure, it will restore to the last checkpoint and then restart the processing.
在 Flink为了启用容错功能,我们需要启动checkpointing
。Flink热衷于周期性的对状态进行拍照。在失败的情况下,它会从最新的checkpointing
恢复,然后重新处理。
We can also define the Kafka producer as a sink. This will write the data to a Kafka topic.The following is a way to write data to a Kafka topic
我们也可以定义Kafka
生产端作为sink
,这样,数据会写入到Kafka的主题。下面是写数据到Kafka 主题的方式。
In Scala
stream.addSink (new FlinkKafkaproducer09 ("localhoat: 9092",
"mytopic", new SimpleStringSchema()));
In Java:
stream.addSink (new FlinkKafkaProducer09[string] ("localhoat:9092",
"mytopic", new SimpleStringSchema())).
Twitter connector
These days, it is very important to have the ability to fetch data from Twitter and process it. Many companies use Twitter data for doing sentiment analytics for various products,services, movies,reviews, and so on. Flink provides the Twitter connector as one data source. To use the connector, you need to have a Twitter account. Once you have a Twitter account, you need to create a Twitter application and generate authentication keys to be used by the connector. Here is a link that will help you to generate tokens: httpa://dev.twitter.com/oauth/overview/application-owner-acceaa-tokens.
The Twitter connector can be used through the Java or Scala API:
如今,能够从Twitter获取并处理数据非常重要。许多公司使用Twitter的数据对各种产品、服务、电影、评论等进行情绪分析。Flink将Twitter conmector作为一个数据源提供。要使用连接器,您需要有一个Twitter帐户。有了Twitter帐户之后,您需要创建一个Twitter应用程序并生成连接器使用的身份验证密钥。下面的链接将帮助您生成token
Twitter连接器可以通过Java或Scala API使用:
Once tokens are generated, we can start writing the program to fetch data from Twitter.First we need to add a Maven dependency:
一 旦我们生成了token
,我们就可以开始写程序从twiter获取数据。首先,我们需要加入下边的Maven
依赖。
<dependeney>
<groupId>org. apache.flink</groupId>
<artifactId>flink-connector-twitter_2.11</artifactId>
<version>1.1.4</veraion>
</dependency>
Next we add Twitter as a data source. The following is the sample code。
下面我们加入Twitter
作为数据源,下面是示例代码。
In Java
Propertiea props = new properties ();
props.setproperty (TwitterSource.CONSUMER_KEY, "");
props.setproperty (TwitterSource.CONSUMER_SECRET,"")
props.setProperty (TwitterSource. TOKEN.");
props.aetProperty (TwitterSource. TOKEN_SECRET,"");
DataStream<string> streamSource = env.addSource (new TwitterSource (props));
In Scala:
val prope = new Properties ();
props.setProperty (TwitterSource.CONSUMER_KEY. "");
props.setProperty (TwitterSource.CONSUMER_SECRET.,"");
props.setProperty (TwitterSource. TOKEN,"");
props.setProperty (TwitterSource. TOKEN_SECRET,"");
DataStream<string> streamSource = env.addSource (new TwitterSource (props) );
In the preceding code, we first set properties for the token we got. And then we add the TwitterSource. If the given information is correct then you will start fetching the data from Twitter. TwitterSource emits the data in a JSON string format. A sample Twitter JSON looks like the following:
上面的代码,我们首先设置了token
,然后我们加入了TwitterSource
。如果我们指定的信息是正确的,那么我们会从Twitter
获取到数据。 TwitterSource
会将数据在JSON
的格式发出去。下面是Twitter JSON
的示例:
{
..."text": " ‘Toyalty 3.0: How to Revolutionize customer &
Employee Engagement with Big data & #Gamification’ can be ordered here:http://t.co/1XhqyaNjuR ",
"geo: null,
"retweeted": false,
"in_reply_to_screen_name": null,
"possibly_senaitive": false,
"truncated": false,
"lang": "en",
"hashtags":[{
"text": "Gamification",
"indices": [90,
103]
}],
}
"in_reply_to_statua_id_str": null,
"id": 330094515484508160
...
}
TwitterSource
provides various endpoints. By default it uses StatuseaSampleEndpoint
, which returns a set of random tweets. If you need to add some filters and don't want to use the default endpoint, you can implement the TwitterSource.EndpointInitinlizer interface
TwitterSource
提供了很多种endpoints
,默认的是StatuseaSampleEndpoint
,这个endpoint
返回一组推文。如果你加入过滤器,并且也不想使用默认的endpoint
,你可以实现TwitterSource.EndpointInitinlizer
接品。
Now that we know how to fetch data from Twitter, we can then decide what to do with this data depending upon our use case. We can process, store, or analyze the data.
现在我们知道了如何从Twitter获取数据,接下来我们可以根据用例决定如何处理这些数据。我们可以处理、 存储或分析数据。
RabbitMQ connector
RabbitMQ is a widely used distributed, high-performance, message queuing system. It is used as a message delivery system for high throughput operations. It allows you to create a distributed message queue and include publishers and subscribers in the queue. More reading on RabbitMQ can be done at following link httpa://www.rabbitmq.com/
(RabbitMQ是一种广泛使用的分布式、高性能的消息队列系统。它被用作高吞吐量操作的消息传递系统。它允许您创建分布式消息队列,并在队列中包含发布者和订阅者。更多关于RabbitMQ的阅读可以通过以下链接https://www.rabbitmq.com/)
Flink supports fetching and publishing data to and from RabbitMQ. It provides a connector that can act as a data source of data streams.For the RabbitMQ connector to work, we need to provide following information:
(Flink支持从RabbitMQ
获取和发布数据。它提供了一个连接器,可以作为数据流的数据源。要使RabbitMQ连接器正常工作,我们需要提供以下信息)
-
RabbitMQ configurations such as host, port, user credentials, and so on.
RabbitMQ配置,如主机、端口、用户凭据等 -
Queue, the RabbitMQ queue name which you wish to subscribe
队列,你希望订阅的RabbitMQ
的队列名。 -
Correlation IDs is a RabbitMQ feature used for correlating the request and response by a unique in a distributed world. The Flink RabbitMQ connector provides an interface to set this to true or false depending on whether you are using it or not.
关联id是一个RabbitMQ特性,用于在分布式世界中通过惟一的关联请求和响应。Flink RabbitMQ
连接器提供了一个接口,根据您是否使用它来将其设置为true或false。 -
Deserialization schema-RabbitMQ stores and transports the data in a serialized manner to avoid network traffic. So when the message is received, the subscriber should know how to deserialize the message. The Flink connector provides us with some default deserializers such as the string deserializer.
反序列化模式 rabbitmq以序列化的方式存储和传输数据,以避免网络流量。因此,当接收到消息时,订阅者应该知道如何反序列化消息。Flink连接器为我们提供了一些默认的反序列化器,比如字符串反序列化器
RabbitMQ source provides us with the following options on stream deliveries:
RabbitMQ source
提供给我们以下几个选项关于流的交付:
- Exactly once: Using RabbitMQ correlation IDs and the Flink check-pointing mechanism with RabbitMQ transactions
(通过使用关联IDs
和Flink check-pointing
机制支持RabbitMQ
事务。) - At-least once: When Flink checkpointing is enabled but RabbitMQ correlation IDs are not set
(当Flink checking pointing
被启用但RabbitMQ
的关联ID未被设置) - No strong delivery guarantees with the RabbitMQ auto-commit made
在RabbitMQ
自动提交模式下,不提供强投递保证。
Here is a diagram to help you understand the RabbitMQ connector in better marner:
(下面帮助我们更好地理解RabbitMQ
连接器)
Now let's look at how to write code to get this connector working. Like other connectors, we need to add a Maven dependency to the code:
(下面,让我们看一下怎么编写代码让conector
运行起来。象其他的连接器一样,我们需要加入下面的Maven
依赖。)
<dependency>
<groupId>org. apache.flink</groupId>
<artifactId>flink-connector-rabbitmg_2.11</artifactId>
<version>1.1.4</version>
</dependency>
The following snippet shows how to use the RabbitMQ connector
In Java:
//configurations
RMQconnectionconfiq connectionconfig = new RMQconnectionconfig.Builder() .
setHoat (<host>).setPort (<port>) .setUaerName (..)
.setPassword(..).setVirtualHoat ("/").build();
//Get Data stream without correlation ids
DataStream<String> streamWo= env. addSource (new RMQSource<string> (connectionconfig, "my-queue", new SimpleStringSchema()))
.print
//Get Data stream without correlation ids
DataStream<string> stream = env.addSource (new RMQSource<string> (connectionconfig, "my-queue", true,new SimpleStringSchema ()))
.print
Similarly, in Scala the code can written as follows:
相应的scala
代码如下:
val connectionConfig = new RMQConnectioConfiq.Builder ()
.setHost () .setPort ().setUserName (..) .setPaasword(..).setVirtualHoat ("/") .build()
streamsWoIds=
env.addSource (new RMQSource [stringl (connectionConfig, "my-queue", new SimpleStringSchema)).print.
streamaWIds = env.
addSource (new RMQSource [string] (connectionConfig, "my-queue", true,
new simpleStringSchema)).print
We can also use the RabbitMQ connector as a Flink sink. If you want to send processes back to some different RabbitMQ queue, the following is a way to do so. We need to provide three important configurations:
我们还可以使用RabbitMQ连接器作为Flink接收器。如果您想将进程发送回某个不同的RabbitMQ队列,下面是一种方法。我们需要提供三个重要的配置
- RabbitMQ configurations
- Queue name-Where to send back the processed data
- Serialization schema Schema for RabbitMQ to convert the data into bytes
The following is sample code in Java to show how to use this connector as a Flink sink:
RMQconnectionConfig connectionConfig = new RMQConnectionConfig.Builder ()
.setHost (<host>) .setPort (<port>).setUeerName (..)
.setPassword(..).seVirtualHoat ("/") .build();
stream. addSink (new RMQSink<String> (connestionConfig,"target-queue",
new StringToByteSerializer()));
The same can be done in Scala:
同样的用Scala
也可以实现:
val connectionConfig = new RMQConnectionconfig.Ruilder().
setHost (<host>).setPort (<port>).setUserName (..)
.setPaaaword(..).setVirtualHost("/").build()
stream.addSink (new RMQSink [string] (connectionConfig, "target-queue", new
StringToByteSerializer.
ElasticSearch connector
ElasticSearch is a distributed, low-latency, full text search engine system that allows us to index documents of our choice and then allows us to do a full text search over the set of documents.
More on ElasticSearch can be read here at httpa://www.elaatic.co/.
ElasticSearch是一个分布式的、低延迟的全文搜索引擎系统,它允许我们索引自己选择的文档,然后允许我们对文档集进行全文搜索。更多关于ElasticSearch的信息可以在httpa://www.elaatic.co/上阅读。
In many use cases, you may want to process data using Flink and then store it in ElasticSearch. To enable this, Flink supports the ElasticSearch connector. So far,ElasticSearch has had two major releases. Flink supports them both.For ElasticSearch 1.X, the following Maven dependency needs to be added:
在一些场景中,您可能希望使用Flink处理数据,然后将其存储在ElasticSearch中。为此,Flink支持ElasticSearch连接器。到目前为止,ElasticSearch已经发布了两个主要版本。Flink支持这两者。ElasticSearch 1.x,需要添加以下Maven依赖项
<derendency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch_2.11</artifactId> <varsion>1.1.4/version>
</derendency>
The Flink connector provides a sink to write data to ElasticSearch. It uses two methods to connect to ElasticSearch:
Flink连接器提供了一个接收器,用于将数据写入ElasticSearch。它使用两种方法连接到ElasticSearch
- Embedded node
- Transport client
The following diagram illustrates this.(如下图所示)
image.pngEmbedded node mode
In the embedded node mode, the sink uses BulkProcessor to send the documents to ElasticSearch. We can configure how many requests to buffer before sending documents to ElasticSearch.
在嵌入式节点模式下,sink使用BulkProcessor
将文档发送到ElasticSearch。我们可以在发送文档到ElasticSearch之前配置缓冲多少请求。
The following is the code snippet:
(代码如下)
DataStream<String> input=...;
Map<String,String> config =Maps.newHashMap();
config.put ("bulk.flush.max. actions", -1");
config.put ("cluster.name", "cluster-name");
input. addSink (new ElasticSearchSink<> (config, new IndexRequeatBuilder<String> () {
@Override
public IndexRequest createIndexReguest (String element,RuntimeContext ctx)
{
Map<String, Object> json =new HashMap<>();
json.put ("data". element);
return Requeats.indexReguest ()
.index ("my-index")
.type ("my-type")
.source (json);
}
}));
In the preceding code snippet, we create a hash map with configurations such as the cluster name and how many documents to buffer before sending the request. Then we add the sink to the stream, specifying the index, type, and the document to store. Similar code in Scala follows:
(在前面的代码片段中,我们创建了一个HashMap
,其中包含一些配置,比如集群名和在发送请求之前需要缓冲的文档数量。然后将sink添加到流中,指定要存储的索引、类型和文档。)
下面是Scala中的类似代码
val input: DataStream[string] = ...
val config = new util. HashMap [string, string]
config.put ("bulk. flush.max.action". "1").
config.put ("cluster.name". "cluster-name")
text. addSink (nes ElasticSearchSink (config. new IndexRequeatBuilder [string]
override def createIndexRequeat (element: string,ctx:RuntimeContext):
IndexRequest ={
val json =new util. HashMap [string. AnyRef]
json.put ("data", element)
Requst.indexRequest.index ("my-index"). type ("my-type").source (jaon)
}
}))
Transport client mode
ElasticSearch allows connections through the transport client on port 9300. Flink supports connecting using those through its connector. The only thing we need to mention here is all the ElasticSearch nodes present in the cluster in configurations.
ElasticSearch允许通过端口9300上的transport client
进行连接。Flink支持通过其连接器进行连接。这里我们唯一需要提到的是集群中所有的ElasticSearch节点的配置。
The following is the snippet in Java:
DataStreamsString> input =...;
Map<string, string> config = Maps.newHashMap();
config.put ("bulk.flush.max.action", "1");
config.put ("cluster.name", "cluster-name");
List <TransportAddress> transports = new ArrayList <string>():
transports. add (new InetsocketTranaportAddress ("es-node-1". 9300)):
transports. add (new InetsocketTranaportAddress ("ea-node-2", 9300));
transports.add (new InetsocketTranaportAddress ("ea-node-3", 9300));
input. addSink (new ElasticSearchSink<> (config, transports, new IndexRequstBuilder<string>() {
@override
public IndexRequeat creaceIndexRequest (String element, RuntimeContext ctx) {
Map<String,Object> json = new HashMap<>();
json.put ("data". element);
return Requests.indexRequest ()
.index ("my-index")
.type ("my-type")
.source (json):
}
}));
Here as well we provide the details about the cluster name, nodes, ports, maximum requests to send in bulk, and so on. Similar code in Scala can be written as follows:
在这里我们提供了集群名
,节点,端口,批量发送的最大请求等待。与java代码相似,对应的scale 代码如下:
val input: DataStream[string] =...
val config = new util.HashMap[string,string]
config.put ("bulk. flush.max.actions",1")
config.put ("cluster.name", "cluster-name")
val transports = new ArrayList [string]
transports.add (new InetSocketTransportAddress("ea-node-1", 9300));
transports.add (new InetSockeTranaportAddress("ea-node-2", 9300))
transports.add (new InetSocketTranaportAddreaa ("ea-node-3". 9300))
text. addSink(new ElasticsearchSink (config. transports. new IndexRequestBuilder [string] {
override def createIndexRequest (element: string, ctx: RuntimeContext):
IndexRequest={
val json = new util.HashMap[String, AnyRef]
json.put ("data", element)
Requests.indexRequest. index ("my-index").
.type. ("my-type") .
source (json)
}
}));
Cassandra connector
Cassandra is a distributed, low latency, NoSQL database. It is a key value-based database.Many high throughput applications use Cassandra as their primary database. Cassandra works with a distributed cluster mode. where there is no master-slave architecture. Reads and writes can be felicitated by any node. More on Cassandra can be found at: http://cassandra. apache.org/
Cassandra
是一个分布式的,低延迟的。 NoSQL
数据库,它是一个key value 的数据库。很多高吞吐的应用程序用Cassandra
作为主要的数据库。Cassandra
运行在分布式集群模式。这种集群模式并不是master-slave
架构,每个节点都可以提供读和写操作。更多关于Cassandra
请参见:
Apache Flink provides a connector which can write data to Cassandra. In many applications, people may want to store streaming data from Flink into Cassandra. The following diagram shows a simple design of the Cassandra sink:
Apache Flinkk
提供一个链接器,这个链接器可以将数据写到Cassandra
。在一些应用中,人们也想存储流数据到Cassandra
。下图显示了Cassandra sink
的简单设计。
Like other connectors, to get this we need to add it as a maven dependency:
和其它的connections一样,我们也需要加入以下maven 依赖。
<dependency>
<qroupId>org. apache. flink</qroupid>
<artifactId>flink-connector-cassandra_2.11</artifacttd>
<version>1.1.4</version>
</dependency>
Once the dependency is added, we just need to add the Cassandra sink with its configurations, as follows
In Java:
maven依赖添加之后,我们需要加入Cassandra sink
和它对应的配置,如下:
cassandraSink.addSink (input)
.setQuery ("INSERI INTO cep.events (id, message) values (?,?);").
.setClusterBuilder (new ClusteBuilder(){
@override
public Cluater buildCluster(Cluster.Builder builder) {
return builder.addContactPoint (127.0.0.1").build();
}})
.build()
The preceding code writes stream of data into a table called events. The table expects an event ID and a message. Similarly in Scala:
前面的代码将数据流写入名为events
的表中。该表需要一个事件ID和一条消息
CassandraSink.addSink: (input)
.setQuery ("INSERI INTO cep.evente (id, mesaage) values (?,?):")
.setClusterBuilder (new ClusterBuilder(
@override
public Cluster buildCluster (Cluster.Builder builder) {
return builder.addContactPoint ("127.0.0.1").build();
}
})
.build();
Use case - sensor data analytics
Now that we have looked at various aspects of DataStream API, let's try to use these concepts to solve a real world use case. Consider a machine which has sensor installed on it and we wish to collect data from these sensors and calculate average temperature per sensor every five minutes
Following would be the architecture:
现在我们已经了解了DataStream API的各个方面,让我们尝试使用这些概念来解决实际的问题。考虑一台安装了传感器的机器,我们希望从这些传感器收集数据,并计算每个传感器每五分钟的平均温度
In this scenario, we assume that sensors are sending information to Kafka topic called temp
.with information as (timestamp, temperature, sensor-ID). Now we need to write code to read data from Kafka topics and processing it using Flink transformation.
在这个场景中,我们假设传感器正在向Kafka主题“temp”发送信息。信息为(时间戳、温度、传感器id)。现在我们需要编写代码来从Kafka主题读取数据并使用Flink转换来处理它
Here important thing to consider is as we already have timestamp values coming from sensor, we can use Event Time computations for time factors. This means we would be able to take care of events even if they reach out of order.
这里需要考虑的重要一点是,由于我们已经有了来自传感器的时间戳,我们可以对时间因子使用事件时间计算。这意味着我们将能够处理事件,即使它们乱序。
We start with simple streaming execution environment which will be reading data from Kafka. Since we have timestamps in events, we will be writing a custom timestamp and watermark extractor to read the timestamp values and do window processing based on that. Here is code snippet for the same.
我们从简单的流执行环境开始,该环境将从Kafka读取数据。由于我们在事件中有时间戳,因此我们将编写自定义时间戳和水印提取器来读取时间戳值并基于此进行窗口处理。下面是对应的代码片段。
// set up the streaming execution environment
final StreanExecutionEnvironment env=
StreamExecutionEnvironment.getExecurionEnvironment ():
// env.enableCheckpointing (5000);
nv.setStreamTimeCharacteriatic(TimeCharacterietic.EventTime);
Properties properties = new Properties();
properties.setProperty ("bootstrap.servers", "localhoat:9092");
properties.setProperty ("zookeeper.connect", "localhoat:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer09<string> myConsumer = new FlinkKafkaConsumer09<>("temp", new SimpleStringSchema (),properties);
myConsumer.assignTimestampAndWatermarks (new CustomWatermarkEmitter());
Here we assume that we receive events in Kafka topics as strings and in the format:
在这里,我们假设我们从Kafka主题
收到的字符串格式如下
Timeatamp,Temperature. sensor-Id
The following an example code to extract timestamp from record
下面的代码是从记录中抽取时间戮
public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String>{
private static final long serialveraionID = 1L;
@override
public long extractTimestamp (string arg0, long arg1){
if (null != arg0 &arg0.contains(",")){
String parts[] = arg0.split(",");
return Long.parseLong (parts[0]);
}
return 0:
}
@override
public Watermark checkAndGetNextWatermark (string arg0. long arg1){
if (null != arg0 &arg0.contains (",")){
string parts[] =arg0.split (",");
return new Watermark (Long.parseLong (parts [0]));
}
return null;
}
}
Now we simply created keyed data stream and perform average calculation on temperature values as shown in the following code snippet.
现在我们简单地创建了有键的数据流,并对温度值执行平均计算,如下面的代码片段所示。
DataStreams<Tuple2<String,Double>> keyedStream =
env.addsource (myConsumer) .flatMap (new Splitter().keyBy(0)
.timeWindow (Time.seconde (300))
.apply (new WindowFunetion<Tuple2<String,Double>, Tuple2<String,Double>,Tuple,TimeWindow>(){
@override
public void apply (Tuple key, TimeWindow window,
Iterable<Tuple2<String,Double>> input,Collector<Tuple2<String,Double>> out) throws Exception {
double sum = 0;
int count =0;
for (Tuple2<String,Double> record : input){
sum -= record.f1;
count--;
}
Tuple2<String,Double> result = input.iterator().next ();
result.f1 = (sum/count);
out.collect(result);
}
});
When execute the preceding given code, and if proper sensor events are published on Kafka topics then we will get the average temperature per sensor every five minutes
当执行前面给出的代码时,如果Kafka主题上发布了合适的传感器事件,那么我们将每五分钟获得每个传感器的平均温度
The complete code is available on GitHub at
完整的代码在github 上:
https://github.com/deshpandetanmay/mastering-flink/tree/master/chapter02/flink-streaming.
Summary
In this chapter, we started with Flink's most powerful API: DataStream API We looked at how data sources, transformations, and sinks work together. Then we looked at various technology connectors such as ElasticSearch, Cassandra, Kafka, RabbitMQ, and so on.
在这一节,我们开始学习了Flink
最有力的 API:DateStream API,我们也看到了data source
,transformations
和sink
如何一起工作的。然后,我 们也看到了基于各种技术的connectors
比如ElasticSearch, Cassandra, Kafka, RabbitMQ
等待。
At the end, we also tried to apply our learning to solve a real-world sensor data analytics use case.
最后,我们还尝试应用我们的学习来解决一个真实的传感器数据分析用例。
In the next chapter, we are going to learn about another very important API from Flink's ecosystem point of view the DataSet API.
在下一节,我们会继续学习关于flink生态的另一套非常重要的API--DataSet API