hbase使用es做二级索引-使用observer

2020-03-10  本文已影响0人  风一样的存在

开发环境
sqoop 1.4.7
hbase 1.3.1
elasticsearch 7.1.1

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.*;

public class HbaseDataSyncEsObserver extends BaseRegionObserver {
    private String esAddress;
    private String index;
    private String family;

    /**
     * 读取配置
     * @param env
     */
    private void readConfiguration(CoprocessorEnvironment env) {
        Configuration conf = env.getConfiguration();
        esAddress = conf.get("es_address");
        index = conf.get("es_index");
        family = conf.get("family");

    }

    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        readConfiguration(e);
        ElasticSearchConfig.restHighLevelClient(esAddress);
    }

    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        ElasticSearchConfig.client.close();
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) {
        String indexId = Bytes.toString(put.getRow());
        NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
        Map<String,Map<String,Object>> esData = new HashMap<>();
        Map<String, Object> data = new HashMap<>();
        for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()){
            if(Bytes.toString(entry.getKey()).equals(family)){
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    data.put(key, value);
                }
                break;
            }
        }
        esData.put(indexId,data);
        ElasticSearchUtil.saveEsDataWithBulk(esData,index);
    }

    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(delete.getRow());
        ElasticSearchUtil.delteEsDataWithBulk(Arrays.asList(indexId), index);
    }
}

es创建连接

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient.FailureListener;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticSearchConfig {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchConfig.class);
    public static RestHighLevelClient client;


    public static RestHighLevelClient restHighLevelClient(String address) {
        String[] hosts = address.split(",");
        if (hosts != null && hosts.length > 0) {
            HttpHost[] httpHosts = new HttpHost[hosts.length];
            int count = 0;
            String[] var4 = hosts;
            int var5 = hosts.length;

            for(int var6 = 0; var6 < var5; ++var6) {
                String host = var4[var6];
                httpHosts[count] = new HttpHost(host, 9200, "http");
                ++count;
            }

            RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setRequestConfigCallback(new RequestConfigCallback() {
                public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                    requestConfigBuilder.setConnectTimeout(0);
                    requestConfigBuilder.setSocketTimeout(300000);
                    requestConfigBuilder.setConnectionRequestTimeout(2000);
                    return requestConfigBuilder;
                }
            });
            restClientBuilder.setFailureListener(new FailureListener() {
                public void onFailure(Node node) {
                    ElasticSearchConfig.log.error("************************es 监听器 failure:{}", node.getName());
                }
            });
            restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
            client = new RestHighLevelClient(restClientBuilder);
            log.info("ElasticSearch client init success ....");
        }

        return client;
    }
}

es实现更新删除操作

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class ElasticSearchUtil {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchUtil.class);

    private static RestHighLevelClient getClient() {
        return ElasticSearchConfig.client;
    }

    /**
     *
     * @param esBulkData 存储的数据
     * @param index  索引
     */
    public static void saveEsDataWithBulk(Map<String, Map<String, Object>> esBulkData, String index) {
        BulkRequest bulkRequest = new BulkRequest();
        Iterator var3 = esBulkData.entrySet().iterator();

        while(var3.hasNext()) {
            Map.Entry<String, Map<String, Object>> oneEsData = (Map.Entry)var3.next();
            String id = oneEsData.getKey();
            Map<String, Object> esData = (Map)oneEsData.getValue();
            UpdateRequest request = new UpdateRequest(index, id);
            request.doc(esData);
            request.docAsUpsert(true);
            bulkRequest.add(request);
        }

        try {
            getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
        } catch (Exception var8) {
            log.error("save esData error:", var8);
        }

    }

    public static void delteEsDataWithBulk(List<String> ids,String index){
        BulkRequest bulkRequest = new BulkRequest();
        ids.stream().forEach(id->{
            DeleteRequest request = new DeleteRequest(index);
            request.id(id);
            bulkRequest.add(request);
        });
        try {
            getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
        } catch (Exception var8) {
            log.error("save esData error:", var8);
        }
    }

    private ElasticSearchUtil() {
    }
}
    <properties>
        <hbase.verson>1.3.1</hbase.verson>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <elasticsearch.verson>7.1.1</elasticsearch.verson>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.verson}</version>
        </dependency>
        <!-- es -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.verson}</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.verson}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.10</version>
        </dependency>
    </dependencies>

打包的时候需要注意要把所有的依赖打包进去,此处使用maven插件assembly

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.1</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

alter "blacklist", METHOD => 'table_att', 'coprocessor'=>'hdfs:///coprocessor/hbase-observer-elasticsearch-1.0.0-SNAPSHOT.jar|com.qjdchina.hbase.observer.HbaseDataSyncEsObserver|1001|es_address=10.1.1.235,es_index=blacklist,family=info'

配置

遇到的问题:

  • 1.通过hbase shell测试put和deleteall操作发现es数据同步更新,但是奇怪的是通过sqoop导入数据,同时触发postPut和postDelete操作
    es上文档
    导入命令:
    ./bin/sqoop import
    --connect jdbc:mysql://数据库地址?tinyint1isbit=false
    --username 账户
    --password '密码'
    --query 'select id,card_number,legal_person from tax_info where insert_time >= "2020-03-13 16:00:00" and insert_time <= "2020-03-13 18:00:00" and $CONDITIONS '
    --hbase-create-table
    --columns 'id','card_number','legal_person'
    --column-family info
    --incremental append
    --check-column 'id'
    --hbase-row-key "id"
    --hbase-table tax_info_tmp01
    -m 1
  • 2.使用sqoop同时分批次导入多个列簇,一个列簇当查询条件同步es,其他列簇不同步,发现先导入到es上的数据,在导入其他列簇的时候触发postDelete操作又删除了
上一篇下一篇

猜你喜欢

热点阅读