Mongo-Connector的使用
2019-04-27 本文已影响0人
谁在烽烟彼岸
工具介绍
mongo-connector工具创建一个从MongoDB簇到一个或多个目标系统的管道,目标系统包括:Solr,Elasticsearch,或MongoDB集群。 该工具在MongoDB与目标系统间同步数据,并跟踪MongoDB的oplog,保持操作与MongoDB的实时同步。
环境
mongo-connector支持Python 3.4+和MongoDB版本3.4和3.6。
原理
oplog日志
1.简介
oplog是local库下的一个固定集合,Secondary就是通过查看Primary 的oplog这个集合来进行复制的。每个节点都有oplog,记录这从主节点复制过来的信息,这样每个成员都可以作为同步源给其他节点。
2.oplog日志结构
- ts: 8字节的时间戳,由4字节unix timestamp + 4字节自增计数表示。这个值很重要,在选举(如master宕机时)新primary时,会选择ts最大的那个secondary作为新primary
- op:1字节的操作类型
"i": insert
"u": update
"d": delete
"c": db cmd
"db":声明当前数据库 (其中ns 被设置成为=>数据库名称+ '.')
"n": no op,即空操作,其会定期执行以确保时效性- ns:操作所在的namespace
- o:操作所对应的document,即当前操作的内容(比如更新操作时要更新的的字段和值)
- o2: 在执行更新操作时的where条件,仅限于update时才有该属性
mongo-connector的核心目录结构是:
├── mongo_connector
│ ├── __init__.py
│ ├── compat.py
│ ├── config.txt
│ ├── connector.py
│ ├── constants.py
│ ├── doc_managers
│ │ ├── __init__.py
│ │ ├── config.txt
│ │ ├── doc_manager_simulator.py
│ │ ├── elastic_doc_manager.py
│ │ ├── formatters.py
│ │ ├── mongo_doc_manager.py
│ │ ├── schema.xml
│ │ ├── solr_doc_manager.py
│ │ └── solr_doc_manager.pyc
│ ├── errors.py
│ ├── locking_dict.py
│ ├── oplog_manager.py
│ └── util.py
程序的入口就是connector.py,main方法通过从命令行中接收参数信息,参数信息可以用mongo-connector --help查看,根据参数信息构建connector对象,connector继承Thread,具体的执行流程如下:
mongoConnetor协同es
image.png1.mongodb开启复制集
./mongod --replSet "rs0"
2.mongo-connector
#安装mongo-connector
pip install mongo-connector
#安装 elastic2-doc-manager
pip install elastic2-doc-manager
#启动
#1.命令行参数
mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager
#2.配置文件(推荐)
mongo-connector -c config/mongo_es_config.json
配置文件如下
{
#mongodb地址--数据来源
"mainAddress": "localhost:27117",
"authentication": {
"adminUsername": "******",
"password": "**********"
},
#oplog日志文件
"oplogFile" : "/usr/local/mongo-connector/oplog/oplog.timestamp",
"logging":{"type":"file",
"filename":"/usr/local/mong o-connector/log/mongo-connector.log",
"format":"%(asctime)s [%(levelname)s] %(name)s:%(lineno)d - %(message)s",
"rotationWhen": "midnight",
"rotationInterval": 1,
"rotationBackups": 10
},
#命名空间,对数据可做相应修改
"namespaces": {
"away.area":{"rename": "area.awayArea"},
"away.poi": {"rename": "poi.awayPoi","excludeFields":["dimension_score_v2","dimension_rate","source_id","alias"]},
},
#elastic2_doc_manager
"docManagers": [
{
"docManager": "elastic2_doc_manager",
"targetURL": "localhost:9222",
"bulkSize": 1000,
"autoCommitInterval": 5
}
]
}