java从0到架构师

85_熟练掌握ES Java API_基于bulk实现多4S店销

2020-03-04  本文已影响0人  小山居

<meta charset="utf-8">

85_熟练掌握ES Java API_基于bulk实现多4S店销售数据批量上传

业务场景:有一个汽车销售公司,拥有很多家4S店,这些4S店的数据,都会在一段时间内陆续传递过来,汽车的销售数据,现在希望能够在内存中缓存比如1000条销售数据,然后一次性批量上传到es中去

image.png

添加数据:有两条重复数据



PUT /car_shop/sales/1
{
    "brand": "宝马",
    "name": "宝马320",
    "price": 320000,
    "produce_date": "2017-01-01",
    "sale_price": 300000,
    "sale_date": "2017-01-21"
}

PUT /car_shop/sales/2
{
    "brand": "宝马",
    "name": "宝马320",
    "price": 320000,
    "produce_date": "2017-01-01",
    "sale_price": 300000,
    "sale_date": "2017-01-21"
}

java中利用api实现批量上传:

BulkRequestBuilder bulkRequest = client.prepareBulk();

bulkRequest.add(client.prepareIndex("car_shop", "sales", "3")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("brand", "奔驰")
                        .field("name", "奔驰C200")
                        .field("price", 350000)
                        .field("produce_date", "2017-01-05")
                        .field("sale_price", 340000)
                        .field("sale_date", "2017-02-03")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareUpdate("car_shop", "sales", "1")
        .setDoc(jsonBuilder()               
                    .startObject()
                        .field("sale_price", "290000")
                    .endObject()
                )
        );

bulkRequest.add(client.prepareDelete("car_shop", "sales", "2"));

BulkResponse bulkResponse = bulkRequest.get();

if (bulkResponse.hasFailures()) {}


源码如下:

BulkUploadSalesDataApp


public class BulkUploadSalesDataApp {
    
    @SuppressWarnings({ "resource", "unchecked" })
    public static void main(String[] args) throws Exception {
        Settings settings = Settings.builder()
                .put("cluster.name", "elasticsearch")
                .build();
        
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300)); 
    
        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
        
        IndexRequestBuilder indexRequestBuilder = client.prepareIndex("car_shop", "sales", "3") 
                .setSource(XContentFactory.jsonBuilder()
                            .startObject()
                                .field("brand", "奔驰")
                                .field("name", "奔驰C200")
                                .field("price", 350000)
                                .field("produce_date", "2017-01-20")
                                .field("sale_price", 320000)
                                .field("sale_date", "2017-01-25")
                            .endObject());
        bulkRequestBuilder.add(indexRequestBuilder);
        
        UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate("car_shop", "sales", "1")
                .setDoc(XContentFactory.jsonBuilder()
                        .startObject()
                            .field("sale_price", 290000)
                        .endObject());
        bulkRequestBuilder.add(updateRequestBuilder);
        
        DeleteRequestBuilder deleteReqeustBuilder = client.prepareDelete("car_shop", "sales", "2"); 
        bulkRequestBuilder.add(deleteReqeustBuilder);
        
        BulkResponse bulkResponse = bulkRequestBuilder.get();
        
        for(BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
            System.out.println("version: " + bulkItemResponse.getVersion()); 
        }
        
        client.close();
    }
    
}


上一篇下一篇

猜你喜欢

热点阅读