Python3 操作 elasticsearch
elasticsearch 模块
Elasticsearch低级客户端。提供从Python到ES REST端点的直接映射。
连接集群节点
- 指定连接
es = Elasticsearch(
['172.16.153.129:9200'],
# 认证信息
# http_auth=('elastic', 'changeme')
)
- 动态连接
es = Elasticsearch(
['esnode1:port', 'esnode2:port'],
# 在做任何操作之前,先进行嗅探
sniff_on_start=True,
# 节点没有响应时,进行刷新,重新连接
sniff_on_connection_fail=True,
# 每 60 秒刷新一次
sniffer_timeout=60
)
- 对不同的节点,赋予不同的参数
es = Elasticsearch([
{'host': 'localhost'},
{'host': 'othernode', 'port': 443, 'url_prefix': 'es', 'use_ssl': True},
])
- 假如使用了 ssl
es = Elasticsearch(
['localhost:443', 'other_host:443'],
#打开SSL
use_ssl=True,
#确保我们验证了SSL证书(默认关闭)
verify_certs=True,
#提供CA证书的路径
ca_certs='/path/to/CA_certs',
#PEM格式的SSL客户端证书
client_cert='/path/to/clientcert.pem',
#PEM格式的SSL客户端密钥
client_key='/path/to/clientkey.pem'
)
获取相关信息
- 测试集群是否启动
In [40]: es.ping()
Out[40]: True
- 获取集群基本信息
In [39]: es.info()
Out[39]:
{'cluster_name': 'sharkyun',
'cluster_uuid': 'rIt2U-unRuG0hJBt6BXxqw',
'name': 'master',
'tagline': 'You Know, for Search',
'version': {'build_date': '2017-10-06T20:33:39.012Z',
'build_hash': '1a2f265',
'build_snapshot': False,
'lucene_version': '6.6.1',
'number': '5.6.3'}}
- 获取集群的健康状态信息
In [41]: es.cluster.health()
Out[41]:
{'active_primary_shards': 6,
'active_shards': 6,
'active_shards_percent_as_number': 50.0,
'cluster_name': 'sharkyun',
'delayed_unassigned_shards': 0,
'initializing_shards': 0,
'number_of_data_nodes': 1,
'number_of_in_flight_fetch': 0,
'number_of_nodes': 1,
'number_of_pending_tasks': 0,
'relocating_shards': 0,
'status': 'yellow',
'task_max_waiting_in_queue_millis': 0,
'timed_out': False,
'unassigned_shards': 6}
- 获取当前连接的集群节点信息
In [43]: es.cluster.client.info()
- 获取集群目前所有的索引
In [55]: print(es.cat.indices())
yellow open logstash-2017.11.04 Zt2K7k0yRZaIwmEsZ9H3DA 5 1 301000 0 162.3mb 162.3mb
yellow open .kibana 1Epb3nPFRimFJoRwKHtXIg 1 1 2 0 13.4kb 13.4kb
- 获取集群的更多信息
es.cluster.stats()
利用实例的 cat 属性得到更简单易读的信息
In [85]: es.cat.health()
Out[85]: '1510431262 04:14:22 sharkyun yellow 1 1 6 6 0 0 6 0 - 50.0%\n'
In [86]: es.cat.master()
Out[86]: 'VXgFbKAaTtGO5a1QAfdcLw 172.16.153.129 172.16.153.129 master\n'
In [87]: es.cat.nodes()
Out[87]: '172.16.153.129 27 49 0 0.02 0.01 0.00 mdi * master\n'
In [88]: es.cat.indices()
Out[88]: 'yellow open logstash-2017.11.04 Zt2K7k0yRZaIwmEsZ9H3DA 5 1 301000 0 162.3mb 162.3mb\nyellow open .kibana 1Epb3nPFRimFJoRwKHtXIg 1 1 2 0 13.4kb 13.4kb\n'
In [89]: es.cat.count()
Out[89]: '1510431323 04:15:23 301002\n'
In [90]: es.cat.plugins()
Out[90]: ''
In [91]: es.cat.templates()
Out[91]: 'logstash logstash-* 0 50001\nfilebeat filebeat-* 0 \n'
- 任务
es.tasks.get()
es.tasks.list()
查询
- 发送查询请求
es = Elasticsearch(
['172.16.153.129:9200']
)
response = es.search(
index="logstash-2017.11.14", # 索引名
body={ # 请求体
"query": { # 关键字,把查询语句给 query
"bool": { # 关键字,表示使用 filter 查询,没有匹配度
"must": [ # 表示里面的条件必须匹配,多个匹配元素可以放在列表里
{
"match": { # 关键字,表示需要匹配的元素
"TransId": '06100021650016153' # TransId 是字段名, 06100021650016153 是此字段需要匹配到的值
}
},
{
"match": {
"Ds": '2017-05-06'
}
},
{
"match": {
"Gy": '2012020235'
}
}, ],
"must_not": { # 关键字,表示查询的结果里必须不匹配里面的元素
"match": { # 关键字
"message": "M(" # message 字段名,这个字段的值一般是查询到的结果内容体。这里的意思是,返回的结果里不能包含特殊字符 'M('
}
}
}
},
# 下面是对返回的结果继续排序
"sort": [{"@timestamp": {"order": "desc"}}],
"from": start, # 从匹配到的结果中的第几条数据开始返回,值是匹配到的数据的下标,从 0 开始
"size": size # 返回多少条数据
}
)
- 得到返回结果的总条数
total = res['hits']['total']
- 循环返回的结果,得到想要的内容
res_dict={}
for hit in res['hits']['hits']:
log_time = "%s|%s" % (hit['_source']['Ds'], hit['_source']['Us'])
res_dict[log_time] = "%s|%s|%s|%s" % (hit['_source']['beat']['hostname'],hit['_source']['FileName'], hit['_source']['FileNum'],hit['_source']['Messager'])
- 实例查询7天之内的流水号为:06100021650016153 的日志信息
query_body={
'bool': {
'must_not': {'match': {'message': 'M('}},
'must': [
{'match': {'TransId': '06100021650016153'}},
{'range': {'@timestamp': {'gte': u'now-7d', 'lte': 'now'}}}
]
}
}
res = es.search(
index='logstash-2017.11.14',
body={
"query": query_body,
"sort":[{"@timestamp": {"order": "desc"}}]})
}
)
更高级的 elasticsearch_dsl 模块
小试牛刀
- 单一字段查询
es = Elasticsearch(
['172.16.153.129:9200']
)
s = Search(using=es,
index="logstash-2017.11.14").filter("match",Gy='20160521491').query("match", TransId='06100021650016153').exclude("match", message="M(")
response = s.execute()
using
指明用那个已经连接的对象
query
接收的是查询体语句
exclude
接收的是不匹配的字段 就像 must_not
filter
接收的是过滤语句 ,过滤的条件意思是在返回结果中有这些条件的信息
- 统计结果总数
s.count()
response = sexecute()
response.hits.total
- 获取结果
res_dict={}
for hit in s:
log_time = "%s|%s" % (hit.Ds, hit.Us')
res_dict[log_time] = "%s|%s|%s|%s" % (hit.beat['hostname'],hit.FileName, hit.FileNum,hit.Messager)
设置连接
有几种方法来配置库的连接。
最简单的选择,也是最有用的,就是定义一个默认连接,每次调用API时都会使用这个连接,而不需要显式传递其他连接。
除非要从应用程序访问多个群集,否则强烈建议您使用该 create_connection 方法创建一个默认连接,所有操作都将自动使用该连接。
显式传递一个连接
如果你不想提供全局配置(也就是默认连接),你可以传入你自己的连接(实例elasticsearch.Elasticsearch)作为参数, 使用 using 接受它:
s = Search(using=Elasticsearch('localhost'))
甚至你可以下面的方式来覆盖一个对象已经关联的任何连接
s = s.using(Elasticsearch('otherhost:9200'))
默认链接
默认连接
要定义全局使用的默认连接,请使用 connections模块和create_connection方法:
from elasticsearch_dsl.connections import connections
client = connections.create_connection(hosts=['172.16.153.129:9200'],
http_auth=('elastic', 'changeme'), timeout=20)
- 执行搜索就不必再传连接对象了
s = Search(index="logstash-2017.11.14").filter("match",Gy='20160521491').query("match", TransId='06100021650016153').exclude("match", message="M(")
多集群环境的连接
多个集群
您可以使用以下配置方法同时定义到多个群集的多个连接:
from elasticsearch_dsl.connections import connections
clients = connections.configure(
default={'hosts': 'localhost'},
dev={
'hosts': ['esdev1.example.com:9200'],
'sniff_on_start': True
}
)
上面的情况是适用于第一次连接时的情况
- 运行中设置连接
# if you have configuration to be passed to Elasticsearch.__init__
# 直接传递一个配置信息给 Elasticsearch
connections.create_connection('qa', hosts=['esqa1.example.com'], sniff_on_start=True)
# if you already have an Elasticsearch instance ready
# 追加一个已经准备好的连接对象
connections.add_connection('qa', my_client)
使用连接对象的别名
当使用多个连接时,您可以使用您在下面注册的字符串别名来引用它们:
s = Search(using='qa')
- 如果在该别名下没有注册的连接,KeyError 异常将会被抛出。
KeyError: "There is no connection with alias 'qa'."
Search DSL
该Search对象
该Search对象代表整个搜索请求:
- 查询(queries)
- 过滤器(filters)
- 聚合(aggregations)
- 排序(sort)
- 分页(pagination)
- 附加的参数(additional parameters)
- 关联客户端(associated client)
API 被设计为可链接的。除了聚合功能以外,这意味着Search对象是不可变的(对对象的所有更改都将导致创建(拷贝)一个包含更改的副本)。这意味着您可以安全地将Search对象传递给外部代码,而不必担心这个对象会被修改。
实例化对象时,您可以传递低级别的elasticsearch客户端实例Search:
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
client = Elasticsearch()
s = Search(using=client)
==所有的方法都会返回一个对象的副本,从而安全地传递给外部代码。==
- API 是可连接的,允许你在一个语句中调用多个方法:
s = Search().using(client).query("match", title="python")
- 要将请求发送到Elasticsearch:
s.execute()
- 如果您只是想遍历搜索返回的匹配,则可以遍历该Search对象:
for hit in s:
print(log_time = "%s|%s" % (hit.Ds, hit.Us'))
print(hit.beat['hostname'],hit.FileName, hit.FileNum,hit.Messager)
DS、US、beat、FileName等都是映射好的字段名,就是用 elasticsearch 模块的 search 方法得到的结果里的 hit['_source'] 里面的内容。
搜索结果将被缓存。随后调用execute或试图遍历已经执行的Search对象将不会触发额外的请求发送到Elasticsearch。强制请求时指定 ignore_cache=True调用execute。
- 这个 Search 的对象也可以转换为之前的 Query DSL 格式
s.to_dict()
查询
Elasticsearch_dsl 的 query 类为所有Elasticsearch查询类型提供类。 传递所有参数作为关键字参数。 这些类接受任何关键字参数,然后dsl将传递给构造函数的所有参数作为结果字典中的顶级关键字序列化(因此生成的json被发送到elasticsearch)。 这意味着在DSL中原始查询和其等价物之间存在明确的一对一映射:
from elasticsearch_dsl.query import MultiMatch, Match
# {"multi_match": {"query": "python django", "fields": ["title", "body"]}}
MultiMatch(query='python django', fields=['title', 'body'])
# {"match": {"title": {"query": "web framework", "type": "phrase"}}}
Match(title={"query": "web framework", "type": "phrase"})
==在某些情况下,由于python对标识符的限制,这种方法不支持字段中含有特殊字符的情况,比如:@timestamp。==
在这种情况下,你必须史使用原来的字典形式:Range(** {'@timestamp': {'lt': 'now'}})
强大的 Q
- 您可以使用Q快捷方式可以把带参数的名称或原始数据的dict构建成 Search 对应类的实例:
from elasticsearch_dsl import Q
Q("multi_match", query='python django', fields=['title', 'body'])
Q({"multi_match": {"query": "python django", "fields": ["title", "body"]}})
# 这两种方式最后转换的结果是一致的
MultiMatch(fields=['title', 'body'], query='python django')
- 要将查询添加到Search对象,请使用以下.query()方法:
q = Q("multi_match", query='python django', fields=['title', 'body'])
s = s.query(q)
- 当然,Q 接收的参数,,query 方法都支持
s = s.query("multi_match", query='python django', fields=['title', 'body'])
-
用 Q 实现组合查询
Q 对象可以使用逻辑运算符进行组合:
Q("match", title='python') | Q("match", title='django')
# {"bool": {"should": [...]}}
# 匹配到任意条件即可
Q("match", title='python') & Q("match", title='django')
# {"bool": {"must": [...]}}
# 列表里的条件必须同时匹配
~Q("match", title="python")
# {"bool": {"must_not": [...]}}
# 非
- 实现组合查询的另一种方法
query 方法可以被连续调用
In [193]: sa = Search().query().query('match',title='python').query('match',body='django')
In [194]: sa.to_dict()
Out[194]:
{
"query": {
"bool": {
"must": [
{"match": {
"title": "python"}
},
{"match": {
"body": "django"}
}
]
}
}
}
- 假如你希望对查询的条件进行精确的控制,请使用 Q 构造组合查询:
q = Q('bool',
must=[Q('match', title='python')],
should=[Q(...), Q(...)],
minimum_should_match=1
)
s = Search().query(q)
过滤器
过滤请使用 filter 方法
s = Search()
s = s.filter('terms', tags=['search', 'python'])
# {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
在幕后,这将产生一个Bool查询并将指定的 terms查询放入其filter分支,使其等价于:
s = Search()
s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])
# {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
==下面没搞懂==
如果您想使用post_filter元素进行分面导航,请使用该 .post_filter()方法。
你也可以用 exclude() 排除查询项目:
s = Search()
s = s.exclude('terms', tags=['search', 'python'])
官网上说下面是简写,简直不敢相信
s = s.query('bool', filter=[~Q('terms', tags=['search', 'python'])])
聚合
定义一个聚合,请使用 A
A('terms', field='tags')
# {"terms": {"field": "tags"}}
- 嵌套聚合,可以使用.bucket(),.metric()和 .pipeline()方法:
a = A('terms', field='category')
# {'terms': {'field': 'category'}}
a.metric('clicks_per_category', 'sum', field='clicks')\
.bucket('tags_per_category', 'terms', field='tags')
# {
# 'terms': {'field': 'category'},
# 'aggs': {
# 'clicks_per_category': {'sum': {'field': 'clicks'}},
# 'tags_per_category': {'terms': {'field': 'tags'}}
# }
# }
- 要将聚合添加到Search对象,请使用.aggs充当顶级聚合的属性
s = Search()
a = A('terms', field='category')
s.aggs.bucket('category_terms', a)
# {
# 'aggs': {
# 'category_terms': {
# 'terms': {
# 'field': 'category'
# }
# }
# }
# }
或者下面这样的,有点儿变态
s = Search()
s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day')\
.metric('clicks_per_day', 'sum', field='clicks')\
.pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day')\
.bucket('tags_per_day', 'terms', field='tags')
s.to_dict()
# {
# "aggs": {
# "articles_per_day": {
# "date_histogram": { "interval": "day", "field": "publish_date" },
# "aggs": {
# "clicks_per_day": { "sum": { "field": "clicks" } },
# "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
# "tags_per_day": { "terms": { "field": "tags" } }
# }
# }
# }
# }
- 您可以通过名称访问现有存储桶
s = Search()
s.aggs.bucket('per_category', 'terms', field='category')
s.aggs['per_category'].metric('clicks_per_category', 'sum', field='clicks')
s.aggs['per_category'].bucket('tags_per_category', 'terms', field='tags')
==当链接多个聚合时,什么.bucket()和.metric()方法返回之间是有区别的==
==.bucket()返回新定义的存储区,同时.metric()返回其父容器以允许进一步链接。==
==与Search对象上的其他方法相反,定义聚合是在原地完成的(不返回副本)。==
排序
要指定排序顺序,请使用 .sort() 方法:
s = Search().sort(
'category',
'-title',
{"lines" : {"order" : "asc", "mode" : "avg"}}
)
==它接受可以是字符串或字典的位置参数。字符串值是一个字段名称,可以用-符号前缀来指定降序。==
- 恢复排序使用无参的 sort() 方法
s = s.sort()
分页
要指定from / size参数,请使用Python切片AP
s = s[10:20]
# {"from": 10, "size": 10}
- 如果要访问与查询匹配的所有文档,可以使用 scan使用扫描/滚动弹性搜索API的方法
for hit in s.scan():
print(hit.title)
==请注意,在这种情况下,结果将不会被排序。==
突出高亮
- 要设置突出显示的常用属性,请使用以下highlight_options方法:
s = s.highlight_options(order='score')
- 为各个字段启用突出显示使用以下highlight方法完成:
s = s.highlight('title')
# or, including parameters:
s = s.highlight('title', fragment_size=50)
- 返回的结果将在被赋值到一个对象(变量)上可用,.meta.highlight.FIELD 将包含结果的列表:
response = s.execute()
for hit in response:
for fragment in hit.meta.highlight.title:
print(fragment)
Suggestions 建议
==此部分不懂,有待研究==
要在Search对象上指定建议请求,请使用以下suggest方法:
s = s.suggest('my_suggestion', 'pyhton', term={'field': 'title'})
第一个参数是建议名称(它将返回的名称),第二个是你希望建议者处理的实际文本,关键字参数将被添加到建议的json中,这意味着它应该成为其中一个term,phrase或者completion指出应该使用哪种类型的建议者。
如果您只希望运行搜索的建议部分(通过_suggest 端点),您可以通过execute_suggest以下方式进行:
s = s.suggest('my_suggestion', 'pyhton', term={'field': 'title'})
suggestions = s.execute_suggest()
print(suggestions.my_suggestion)
额外的属性和参数
- 要设置搜索请求的额外属性,请使用该.extra()方法。这可以被用于定义在不能经由像特定的API方法来限定所述主体的键explain或search_after:
s = s.extra(explain=True)
- 要设置查询参数,请使用以下.params()方法:
s = s.params(search_type="count")
- 如果您需要限制elasticsearch返回的字段,请使用以下 source()方法:
#只返回选定的字段
s = s.source(['title', 'body'])
#不返回任何字段,只是元数据
s = s.source(False)
#明确包含/排除字段
s = s.source(include=["title"], exclude=["user.*"])
#重置字段选择
s = s.source(None)
序列化和反序列化
Search 的对象可以使用 .to_dict() 方法序列化为一个字典
您也可以使用 Search 类的 .from_dict() 方法创建一个Search对象。
这将创建一个新的对象,并使用字典中的数据填充这个新的对象
s = Search.from_dict({"query": {"match": {"title": "python"}}})
如果要修改现有的Search对象,并重写它的属性,可以使用这个实例的 update_from_dict() 方法,改变是实时生效的:
In [2]: s = Search()
In [4]: s.to_dict()
Out[4]: {'query': {'match_all': {}}}
In [5]: s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42}
...: )
Out[5]: <elasticsearch_dsl.search.Search at 0x10a7c8550>
In [6]: s.to_dict()
Out[6]: {'query': {'match': {'title': 'python'}}, 'size': 42}
返回结果
您可以通过调用 Search 对象的 .execute() 方法来执行搜索,之后将返回一个 Response 对象。
你可以将这个返回的对象(结果)赋值给一个对象。
该 Response 对象允许您通过属性访问的方式(也就是 . )来访问 Response 对象字典中的任何键。
它还提供了一些方便的帮手:
response = s.execute()
print(response.success())
# 是否成功
# True
print(response.took)
# 命中数
# 12
print(response.hits.total)
print(response.suggest.my_suggestions)
如果要检查response对象的内容,只需使用其 to_dict方法访问原始数据即可打印。
Hits
要访问搜索返回的匹配对象,请访问该hits属性或只是遍历该Response对象:
response = s.execute()
print('Total %d hits found.' % response.hits.total)
for h in response:
print(h.title, h.body)
结果
单独的命中包装在一个便利的类,允许属性访问返回的字典中的键。结果的所有元数据都可以通过meta(不带下划线开头 _ )访问:
response = s.execute()
h = response.hits[0]
print('/%s/%s/%s returned with score %f' % (
h.meta.index, h.meta.doc_type, h.meta.id, h.meta.score))
响应 = s 。执行()
h = 响应。命中[ 0 ]
打印('/ %S / %S / %S 与得分返回%F ' % (
ħ 。元。指数, ħ 。元。DOC_TYPE , ħ 。元。ID , ħ 。元。得分))
注意
==如果刚好文档中有一个字段叫 meta ,可以使用字典的键方法来访问它:hit['meta']。==
聚合
聚合可通过aggregations属性获得:
for tag in response.aggregations.per_tag.buckets:
print(tag.key, tag.max_lines.value)
MultiSearch
如果您需要同时执行多个搜索,则可以使用 MultiSearch 类,这将使用该类的 _msearch API :
from elasticsearch_dsl import MultiSearch, Search
ms = MultiSearch(index='blogs')
ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))
responses = ms.execute()
for response in responses:
print("Results for query %r." % response.search.query)
for hit in response:
print(hit.title)
持久化
映射
- 您可以使用dsl库为应用程序定义映射和基本的持久层。
映射定义遵循与查询dsl类似的模式:
from elasticsearch_dsl import Keyword, Mapping, Nested, Text
# name your type
m = Mapping('my-type')
# add fields
m.field('title', 'text')
# you can use multi-fields easily
m.field('category', 'text', fields={'raw': Keyword()})
# you can also create a field manually
comment = Nested()
comment.field('author', Text())
comment.field('created_at', Date())
# and attach it to the mapping
m.field('comments', comment)
# you can also define mappings for the meta fields
m.meta('_all', enabled=False)
# save the mapping into index 'my-index'
m.save('my-index')
==注意==
默认情况下,所有的字段(除了Nested)都会有单个值。
您可以在创建/定义字段期间,通过向构造函数传入 multi=True(m.field('tags', Keyword(multi=True)))来始终覆盖此期望值。
那么,即使字段没有被设置,字段的值也将是一个空的列表,使您能够写入。
doc.tags.append('search')
- 特别是如果您使用动态映射,则可以根据Elasticsearch中的现有类型更新映射,或直接从现有类型创建映射:
# get the mapping from our production cluster
m = Mapping.from_es('my-index', 'my-type', using='prod')
# update based on data in QA cluster
m.update_from_es('my-index', using='qa')
# update the mapping on production
# 在生产上更新映射
m.save('my-index', using='prod')
- 常用字段选项:
multi
如果设置True为该字段的值将被设置为[]第一次访问。
required
指示字段是否需要文档的有效值。
Analysis
要指定字段的analyzer值,Text您可以使用分析仪的名称(作为字符串),并依靠定义的分析仪(如内置分析仪)或手动定义分析仪。
或者,您可以创建自己的分析器并让持久层处理其创建:
from elasticsearch_dsl import analyzer, tokenizer
my_analyzer = analyzer('my_analyzer',
tokenizer=tokenizer('trigram', 'nGram', min_gram=3, max_gram=3),
filter=['lowercase']
)
每个分析对象需要有一个名字(my_analyzer和trigram在我们的例子)和断词,令牌过滤器和过滤器炭还需要指定类型(nGram在我们的例子)。
==在创建依赖于自定义分析器的映射时,索引必须不存在或被关闭。要创建多个DocType定义的映射,您可以使用Index对象==
DocType
如果你想在你的文档中创建一个类似于模型的包装,请使用 DocType类:
from datetime import datetime
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
analyzer, InnerObjectWrapper, Completion, Keyword, Text
html_strip = analyzer('html_strip',
tokenizer="standard",
filter=["standard", "lowercase", "stop", "snowball"],
char_filter=["html_strip"]
)
class Comment(InnerObjectWrapper):
def age(self):
return datetime.now() - self.created_at
class Post(DocType):
title = Text()
title_suggest = Completion()
created_at = Date()
published = Boolean()
category = Text(
analyzer=html_strip,
fields={'raw': Keyword()}
)
comments = Nested(
doc_class=Comment,
properties={
'author': Text(fields={'raw': Keyword()}),
'content': Text(analyzer='snowball'),
'created_at': Date()
}
)
class Meta:
index = 'blog'
def add_comment(self, author, content):
self.comments.append(
{'author': author, 'content': content})
def save(self, ** kwargs):
self.created_at = datetime.now()
return super().save(** kwargs)
文档生命周期 (Document life cycle)
在首次使用Post文档类型之前,您需要在Elasticsearch中创建映射。为此,您可以使用Index对象或通过调用init类方法直接创建映射:
# create the mappings in Elasticsearch
Post.init()
要创建一个新Post文档只需实例化这个类并传入你想要设置的任何字段,就可以使用标准属性设置来改变/添加更多的字段。请注意,您不限于显式定义的字段:
# instantiate the document
first = Post(title='My First Blog Post, yay!', published=True)
# assign some field values, can be values or lists of values
first.category = ['everything', 'nothing']
# every document has an id in meta
first.meta.id = 47
# save the document into the cluster
first.save()
所有的元数据字段(id,parent,routing,index等等)可以被访问(和设置)。
通过meta属性或直接用下划线变体访问它们:
post = Post(meta={'id': 42})
# prints 42, same as post._id
print(post.meta.id)
# override default index, same as post._index
post.meta.index = 'my-blog'
- To retrieve(检索,得到) an existing document use the get class method:
# retrieve the document
first = Post.get(id=42)
# now we can call methods, change fields, ...
first.add_comment('me', 'This is nice!')
# and save the changes into the cluster again
first.save()
# you can also(也) update just individual fields which will call the update API
# and also(并且) update the document in place(首先)
first.update(published=True, published_by='me')
- If the document is not found in elasticsearch an exception (elasticsearch.NotFoundError) will be raised. If you wish to return None instead just pass in ignore=404 to suppress the exception:
p = Post.get(id='not-in-es', ignore=404)
p is None
- 当您想要同时检索多个文档时,id 您可以使用以下mget方法:
posts = Post.mget([42, 47, 256])
mgetNotFoundError如果有任何文件没有找到,并且RequestError文件中有任何内容导致错误,将会默认提出。您可以通过设置参数来控制此行为:
raise_on_error
如果True(默认),那么任何错误都会引发异常。否则,包含错误的所有文档将被视为丢失。
missing
可以有三个可能的值:('none'默认)'raise'和 'skip'。如果文档丢失或出错,将被替换为None,将引发异常或文档将完全跳过。
所有有关的信息DocType,包括它的信息Mapping都可以通过_doc_type类的属性来访问:
# name of the type and index in elasticsearch
Post._doc_type.name
Post._doc_type.index
# the raw Mapping object
# 原始映射对象
Post._doc_type.mapping
# the optional name of the parent type (if defined)
# 父类型的可选名称(如果已定义)
Post._doc_type.parent
- The _doc_type attribute is also home to the refresh method which will update the mapping on the DocType from elasticsearch.
_doc_type 属性也是可以通过 refresh 方法来更新elasticsearch的DocType上的映射。
假如你使用的是动态映射,并希望类知道这些字段,(比如,你希望日期字段能被正确的序列化)你这样将会是很有用的:
Post._doc_type.refresh()
- To delete a document just call its delete method:
first = Post.get(id=42)
first.delete()
Search
To search for this document type, use the search class method:
# by calling .search we get back a standard Search object
# 通过调用 .search(), 我们得到一个标准的搜索对象
s = Post.search()
# the search is already limited to the index and doc_type of our document
s = s.filter('term', published=True).query('match', title='first')
results = s.execute()
# when you execute the search the results are wrapped in your document class (Post)
for post in results:
print(post.meta.score, post.title)
- 或者,您可以只取一个Search对象,并限制它返回我们的文档类型,用正确的类包装:
s = Search()
s = s.doc_type(Post)
- 您也可以将文档类与标准文档类型(只是字符串)结合起来,这将像以前一样处理。您也可以传入多个DocType 子类,响应中的每个文档将被包装在它的类中。
If you want to run suggestions, just use the suggest method on the Search object:
s = Post.search()
s = s.suggest('title_suggestions', 'pyth', completion={'field': 'title_suggest'})
# you can even execute just the suggestions via the _suggest API
suggestions = s.execute_suggest()
for result in suggestions.title_suggestions:
print('Suggestions for %s:' % result.text)
for option in result.options:
print(' %s (%r)' % (option.text, option.payload))
class Meta 选项
在Meta文档定义的类中,您可以为文档定义各种元数据:
doc_type
elasticsearch中的doc_type的名称。默认情况下,它将从类名(MyDocument - > my_document)
index
文档的默认索引,默认情况下它是空的,并且每个操作(比如get或save需要一个明确的index参数)
using
默认使用的连接别名,默认为 'default'
mapping
Mapping类的可选实例,用作从文档类本身上的字段创建的映射的基础。
在任何属性Meta是的实例类MetaField将被用于控制元字段(的映射_all,_parent等等)。只需将参数(不带前导下划线)命名为要映射的字段并将任何参数传递给MetaField类:
class Post(DocType):
title = Text()
class Meta:
all = MetaField(enabled=False)
parent = MetaField(type='blog')
dynamic = MetaField('strict')
索引(Index)
Index是一个类,负责在elasticsearch映射和设置中保存与索引有关的所有元数据。
在定义 index 时最为有用,因为它允许同时轻松创建多个 index。在迁移中设置弹性搜索对象时,这非常有用:
from elasticsearch_dsl import Index, DocType, Text, analyzer
blogs = Index('blogs')
# define custom settings
blogs.settings(
number_of_shards=1,
number_of_replicas=0
)
# define aliases
blogs.aliases(
old_blogs={}
)
# register a doc_type with the index
blogs.doc_type(Post)
# can also be used as class decorator when defining the DocType
@blogs.doc_type
class Post(DocType):
title = Text()
# You can attach custom analyzers to the index
html_strip = analyzer('html_strip',
tokenizer="standard",
filter=["standard", "lowercase", "stop", "snowball"],
char_filter=["html_strip"]
)
blogs.analyzer(html_strip)
# delete the index, ignore if it doesn't exist
blogs.delete(ignore=404)
# create the index in elasticsearch
blogs.create()
- 您还可以为您的索引设置模板,并使用该clone方法创建特定的副本:
blogs = Index('blogs', using='production')
blogs.settings(number_of_shards=2)
blogs.doc_type(Post)
# create a copy of the index with different name
company_blogs = blogs.clone('company-blogs')
# create a different copy on different cluster
dev_blogs = blogs.clone('blogs', using='dev')
# and change its settings
dev_blogs.setting(number_of_shards=1)