将 MySQL 中的业务数据接入到 ElasticSearch

2020-06-11  本文已影响0人  乌鲁木齐001号程序员

业务数据接入 | 概述

所谓业务数据接入,就是将业务操作中产生的,存储在 MySQL 中的数据,导入到 ElasticSearch 中。整个导入的过程,大致上可以分成三步:索引构建、全量数据导入、增量数据导入。

索引构建
全量数据导入
增量数据导入

索引构建 | 考量

索引要包含哪些字段
索引中字段的类型

索引构建 | 门店

涉及到 MySQL 中 shop,seller,category 三张表。

PUT /shop
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id":{"type": "integer"},
      "name":{"type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart"},
      "tags":{"type": "text", "analyzer": "whitespace", "fielddata": true},
      "location":{"type": "geo_point"},
      "remark_score":{"type": "double"},
      "price_per_man":{"type": "integer"},
      "category_id":{"type": "integer"},
      "category_name":{"type":"keyword"},
      "seller_id":{"type": "integer"},
      "seller_remark_score":{"type":"double"},
      "seller_disabled_flag":{"type":"integer"}
    }
  }
}

全量数据导入

安装 logstash-input-jdbc
自备 MySQL 驱动
写个配置文件 | jdbc.conf
input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
      jdbc_user => "root"
      jdbc_password => "Jiangdi_2018"
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
      schedule => "* * * * * *"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "shop"
        document_type => "_doc"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
定时任务要执行的 SQL 写在一个文件中 | jdbc.sql
SELECT 
    a.id,
    a.name,
    a.tags,
    CONCAT(a.latitude, ',', a.longitude) AS location,
    a.remark_score,
    a.price_per_man,
    a.category_id,
    b.name AS category_name,
    a.seller_id,
    c.remark_score AS seller_remark_score,
    c.disabled_flag AS seller_disabled_flag
FROM
    shop a
        INNER JOIN
    category b ON a.category_id = b.id
        INNER JOIN
    seller c ON c.id = a.seller_id
启动 logstash

./logstash -f mysql/jdbc.conf

数据导入成功了,通过 ElasticSearch 的 API POST /shop/_search 查看 ;


增量数据导入 | 基于 updated_at

修改 jdbc.conf
input {
    jdbc {
      jdbc_default_timezone => "Asia/Shanghai"
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/dianping?useSSL=false"
      jdbc_user => "root"
      jdbc_password => "Jiangdi_2018"
      jdbc_driver_class => "com.mysql.jdbc.Driver"

      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      last_run_metadata_path => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/last_value_meta"
      statement_filepath => "/home/lixinlei/application/logstash/7.3.0/logstash-7.3.0/bin/mysql/jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*,6 个 * 为每秒钟都更新
      schedule => "* * * * *"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "shop"
        document_type => "_doc"
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}
在 last_value_meta 文件中设置初始值
2010-11-11 11:11:11
修改 jdbc.sql
SELECT 
    a.id,
    a.name,
    a.tags,
    CONCAT(a.latitude, ',', a.longitude) AS location,
    a.remark_score,
    a.price_per_man,
    a.category_id,
    b.name AS category_name,
    a.seller_id,
    c.remark_score AS seller_remark_score,
    c.disabled_flag AS seller_disabled_flag
FROM
    shop a
        INNER JOIN
    category b ON a.category_id = b.id
        INNER JOIN
    seller c ON c.id = a.seller_id
WHERE a.updated_at > :sql_last_value or b.updated_at > :sql_last_value or c.updated_at > :sql_last_value
运行 logstash

./logstash -f mysql/jdbc.conf;

在 MySQL 中修改了数据之后,1 分钟内,会更新到 ElasticSearch 中;

增量数据导入 | 基于 updated_at | 缺陷
上一篇 下一篇

猜你喜欢

热点阅读