storm学习第三天-storm与其他组件交互

2017-08-11  本文已影响593人  小王同学加油

storm如何把数据插入到elasticsearch

1 storm提供的例子

https://github.com/apache/storm/tree/master/external/storm-elasticsearch

代码:

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.1.0</version>
</dependency>

EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);

问题:依赖低版本的Elasticsearch 这个问题没有解决
查看最新代码已经修复了 没提供jar包
需要重新编译storm1.10代码 直接放弃了 采用下面方法

方法2 elasticsearch-hadoop

image.png

疑问?
仅仅支持hadoop吗 storm支持吗我要的是storm?

ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,我们既可以把HDFS的数据导入到ES里面做分析,也可以将es数据导出到HDFS上做备份,归档,其中值得一提的是ES-Hadoop全面的支持了Spark框架,
其中包括Spark(五角星那个上面中间位置)

image.png

疑问:
为了使用这个jar 是否引用一系列相关的jar呀

经过验证不需要引入hadoop 但是json和http引入
折腾不起

     <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.1</version>
    </dependency>
       
   <dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.10</version>
  </dependency>
       
       
        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>5.5.1</version>
    </dependency>
    
    <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.3</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl -->
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.8.8</version>
    </dependency>
    
   <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-core-asl</artifactId>
        <version>1.8.8</version>
    </dependency>
    
    <dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-jaxrs</artifactId>
    <version>1.8.8</version>
</dependency>   
    <dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-xc</artifactId>
    <version>1.8.8</version>
</dependency>

    
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.9.0</version>
    </dependency>
    
                  
      
   <!--
         <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>storm-elasticsearch</artifactId>
          <version>1.1.0</version>
       </dependency>
       
       
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>5.5.1</version>
    </dependency>
    
     <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-storm</artifactId>
            <version>5.5.1</version>
      </dependency>
      -->   
       <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>5.5.1</version>
        </dependency>

代码实现

配置文件:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>

方法3 elasticsearch 官方提供的例子

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>5.5.1</version>
</dependency>

阅读代码:
关键类:TransportClient

Elasticsearch uses standard RESTful APIs and JSON.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
   .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
SearchResponse sr = client.prepareSearch()
 .setQuery(QueryBuilders.matchQuery("message", "myProduct"))
 .addAggregation(AggregationBuilders.terms("top_10_states")
 .field("state").size(10))
 .execute().actionGet();
client.close();

es Elasticsearch from Storm

image.png
http://blog.csdn.net/sunnyyoona/article/details/52860861

https://www.elastic.co/guide/en/elasticsearch/guide/current/dynamic-mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/dynamic-field-mapping.html#date-detection
参考

storm连接kafka

//重点
Storm 如何来封装kafka接口
class:DynamicPartitionConnections

[storm数据怎样输出到elasticsearch]

上一篇下一篇

猜你喜欢

热点阅读