es安装和应用

2018-02-22  本文已影响0人  M问道

es集群安装

1.需要准备jdk1.8环境,elasticsearch5.0安装包
2.需要配置linux环境

vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 65536

vim /etc/sysctl.conf
vm.max_map_count=262144
sysctl -p
 
vi /etc/security/limits.d/90-nproc.conf       
1024-2048

3.修改es配置elasticsearch.yml

cluster.name: my-application
node.name: node-1
path.data: /var/elasticsearch/data
path.logs: /var/elasticsearch/logs
network.host: 192.168.2.200
http.port: 9200
discovery.zen.ping.multicast.enabled: false
discovery.zen.fd.ping_timeout: 100s
discovery.zen.ping.timeout: 100s
discovery.zen.minimum_master_nodes: 2
discovery.zen.ping.unicast.hosts: ["192.168.2.200"]
http.host: 192.168.2.200
http.cors.enabled: true
http.cors.allow-origin: "*"

ps:集群相关的两个配置,network.host和discovery.zen.ping.unicast.hosts。es本身就支持集群,配置这两个配置后就实现了集群

4.设置sshd权限

chown   -R   root.root    /var/empty/sshd
chmod 744 /var/empty/sshd
service sshd restart

elasticsearch head插件安装

1.准备nodejs4.6.1环境和elasticsearch-head5.0插件安装包
2.安装命令

npm install
npm install -g cnpm --registry=https://registry.npm.taobao.org
npm install -g grunt-cli
grunt server

*ps:外界访问head插件地址不通,有可能是防火墙的问题。这边仅贴出centos7关闭命令:sudo systemctl stop firewalld.service && sudo systemctl disable firewalld.service

kibana安装

1.准备kibana5.0安装包
2.直接安装,修改配置

vi config/kibana.yml
elasticsearch.url: "http://192.168.2.200:9200"

3.后台启动

nohup bin/kibana & 

IK分词器安装

1.github 下载ik分词源码,通过maven命令 clean package
2.在打包后的target/release包下一个zip包,拷贝复制到es安装包plugin下,解压即可

es基于插件形式配置,相比较solr配置来说较为简单。它支持REST API调用方式,采用netty作为高性能通信框架,下面来讲讲es相关命令。

1.创建索引、配置、类别(索引表)

curl -XPUT '192.168.2.200:9200/twitter?pretty' -H 'Content-Type: application/json' -d'
{
    "settings" : {
        "index" : {
            "number_of_shards" : 3, 
            "number_of_replicas" : 2 
        }
    },
    "mappings" : {
        "user" : {
            "properties" : {
                "name" : { "type" : "text" , "analyzer": "ik_max_word"}
            }
        }
    }
}
'

2.创建索引表

curl -XPUT '192.168.2.200:9200/twitter/_mapping/user?pretty' -H 'Content-Type: application/json' -d'
{
  "properties": {
    "address": {
      "type": "text",
      "analyzer": "ik_max_word",
      "search_analyzer": "ik_max_word"
    }
  }
}
'

3.创建文档索引

curl -XPUT '192.168.2.200:9200/twitter/user/1?pretty' -H 'Content-Type: application/json' -d'
{
    "name" : "haley",
    "address" : "dongtai",
    "user_id" : "1",
    "content" : "美国留给伊拉克的是个烂摊子吗"
}
'

4.分词

curl -XPOST '192.168.2.200:9200/_analyze?pretty' -H 'Content-Type: application/json' -d'
{
  "analyzer": "ik_max_word",
  "text": "人生在于奋斗,生活在于折腾"
}
'

5.创建内嵌索引表

curl -XPUT '192.168.2.200:9200/product/_mapping/productPlan?pretty' -H 'Content-Type: application/json' -d'
{
  "properties": {
    "name": {
      "type": "text",
      "analyzer": "ik_max_word",
      "search_analyzer": "ik_max_word"
    },
    "id": {
      "type": "keyword"
    },
    "productId": {
      "type": "keyword"
    },
    "categories": {
      "type": "nested",
      "properties": {
        "id": {
            "type": "keyword"
        },
        "productPlanId": {
            "type": "keyword"
        },
        "categoryName": {
            "type": "text",
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_max_word"
        },
        "parentId": {
            "type":"keyword"
        }
        }
    }   
    }
}
'

6.批量创建(修改、删除)文档索引

curl -XPOST '192.168.2.200:9200/product/productPlan/_bulk?pretty' -H 'Content-Type: application/json' -d'
{ "index" : { "_index" : "product", "_type" : "productPlan", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
'

7.简单搜索

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
    "from" : 0, "size" : 10,
    "query" : {
        "term" : { "name" : "保险" }
    }
}
'

8.多值域搜索

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
  "query": {
    "multi_match" : {
      "query":    "保险", 
      "fields": [ "name", "product_name" ] 
    }
  }
}
'

9.搜索过滤字段

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
    "_source": {
        "includes": [ "name", "product_name" ],
        "excludes": [ "*desc" ]
    },
    "query" : {
        "term" : { "name" : "保险" } 
    }
}
'

10.组合搜索

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "filter": [
        { "term" :{ "name": "儿童保险计划"   }},
        { "name" :{ "name": "保险"   }}
      ]
    }
  }
}
'

11.组合加内嵌搜索

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "filter": [
        { "term" :{ "id": "2"   }}
      ],
      "should": [
        {
          "match": {
            "name": "积累" 
          }
        },
        {
          "nested": {
            "path": "categories",
            "query": {
              "bool": {
                "must": [
                  {
                    "term": {
                      "categories.id": "11"
                    }
                  }
                ]
              }
            }
          }
        }
      ],
        "minimum_should_match":1
    }
  }
}
'

12.分组搜索

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
  "size": 0,
  "aggs": {
    "categoryId": {
      "nested": {
        "path": "categories"
      },
      "aggs": {
        "group_by_categoryId": {
          "terms": {
            "field": "categories.id"
          }
        }
      }
    },
    "productId": {
      "terms": {
        "field": "productId"
      }
    }
  }
}
'

13.组合分组(多分组group1,group2)搜索

curl -XGET '192.168.2.200:9200/product/productPlan/_search?pretty' -H 'Content-Type: application/json' -d'
{
  "size": 0,
  "aggs": {
    "productId": {
      "terms": {
        "field": "productId"
      },
      "aggs": {
        "id": {
          "terms": {
            "field": "id"
          }
        }
      }
    }
  }
}
'

14.正则分词器

curl -XPUT '192.168.2.200:9200/my_index?pretty' -H 'Content-Type: application/json' -d'
{
 "settings": {
   "analysis": {
     "analyzer": {
       "my_analyzer": {
         "tokenizer": "my_tokenizer"
       }
     },
     "tokenizer": {
       "my_tokenizer": {
         "type": "pattern",
         "pattern": ","
       }
     }
   }
 }
}
'

15.组合分词器(ik+pinyin)

curl -XPUT '192.168.2.200:9200/my_index?pretty' -H 'Content-Type: application/json' -d'
{
    "settings": {
        "index": {
            "number_of_shards": 3,
            "number_of_replicas": 2
        },
        "analysis": {
            "analyzer": {
                "my_analyzer": {
                    "tokenizer": "ik_max_word",
                    "filter": [
                        "pinyinFilter"
                    ]
                }
            },
            "filter": {
                "pinyinFilter": {
                    "type": "pinyin",
                    "keep_first_letter": false,
                    "keep_full_pinyin": true,
                    "keep_joined_full_pinyin":true,
                    "keep_original": true,
                    "limit_first_letter_length": 16,
                    "lowercase": true
                }
            }
        }
    },
    "mappings": {
        "user": {
            "properties": {
                "name": {
                    "type": "text",
                    "analyzer": "my_analyzer",
                    "search_analyzer": "my_analyzer"
                }
            }
        }
    }
}
'

16.搜索高亮

curl -XGET '192.168.2.200:9200/my_index/user/_search?pretty' -H 'Content-Type: application/json' -d'
{
  "query": {
    "bool": {
      "filter": [
        {
          "match": {
            "name": "得hua"
          }
        }
      ]
    }
  },
  "highlight": {
    "fields": {
      "name": {}
    }
  }
}
'

17.同义词分词器

curl -XPUT '192.168.2.200:9200/test_index?pretty' -H 'Content-Type: application/json' -d'
{
    "settings": {
        "index" : {
            "analysis" : {
                "analyzer" : {
                    "synonym" : {
                        "tokenizer" : "whitespace",
                        "filter" : ["synonym"]
                    }
                },
                "filter" : {
                    "synonym" : {
                        "type" : "synonym",
                        "synonyms_path" : "analysis/synonym.txt"
                    }
                }
            }
        }
    }
}
'

18.推荐搜索

curl -XPUT '192.168.2.200:9200/my_index?pretty' -H 'Content-Type: application/json' -d'
{
    "settings": {
        "index": {
            "number_of_shards": 3,
            "number_of_replicas": 2
        }
    },
    "mappings": {
        "user": {
            "properties": {
                "name": {
                    "type": "completion",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_max_word"
                },
                "tag": {
                    "type": "text",
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_max_word"
                }
            }
        }
    }
}
'
curl -XPOST '192.168.2.200:9200/my_index/_search?pretty&pretty' -H 'Content-Type: application/json' -d'
{
    "suggest": {
        "user" : {
            "prefix" : "爱情", 
            "completion" : { 
                "field" : "name",
                "fuzzy" : {
                    "fuzziness" : 2
                }
            }
        }
    }
}
'

IK分词器的扩展

UpAnalysisIkPlugin.java

public class UpAnalysisIkPlugin extends Plugin implements AnalysisPlugin {

    public static String PLUGIN_NAME = "analysis-ik";
    @Override
    public Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>> getTokenizers() {
        Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>> extra = new HashMap<>();

//        extra.put("ik_smart", UpIkTokenizerFactory::getIkSmartTokenizerFactory);
        extra.put("ik_max_word", UpIkTokenizerFactory::getIkTokenizerFactory);
        return extra;
    }

    @Override
    public Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> getAnalyzers() {
        Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> extra = new HashMap<>();

//        extra.put("ik_smart", IkAnalyzerProvider::getIkSmartAnalyzerProvider);
        extra.put("ik_max_word", IkAnalyzerProvider::getIkAnalyzerProvider);

        return extra;
    }

    

    
    
    @Override
    public Map<String, AnalysisProvider<TokenFilterFactory>> getTokenFilters() {
        Map<String, AnalysisProvider<TokenFilterFactory>> extra = new HashMap<String, AnalysisProvider<TokenFilterFactory>>();
         extra.put("ikSysn", DynamicSynonymTokenFilter::getIkFilterFactory);
        return extra;
    }

    @Override
    public List<Setting<?>> getSettings() {
        Setting<String> dbUrl = new Setting<>("dbUrl", "", Function.identity(),Property.NodeScope);
        Setting<String> dbUser = new Setting<>("dbUser", "", Function.identity(),Property.NodeScope);
        Setting<String> dbPwd = new Setting<>("dbPwd", "", Function.identity(),Property.NodeScope);
        Setting<String> dbTable = new Setting<>("dbTable", "", Function.identity(),Property.NodeScope);
        Setting<String> extField = new Setting<>("extField", "", Function.identity(),Property.NodeScope);
        Setting<String> stopField = new Setting<>("stopField", "", Function.identity(),Property.NodeScope);
        Setting<Integer> flushTime =Setting.intSetting("flushTime", 5, Property.NodeScope);
        Setting<Boolean> autoReloadDic = Setting.boolSetting("autoLoadIkDic", false, Property.NodeScope);
        
        Setting<String> synonymField = new Setting<>("synonymField", "synonym", Function.identity(),Property.NodeScope);
        
        return Arrays.asList(dbUrl,dbUser,dbPwd,dbTable,extField,stopField,flushTime,autoReloadDic,synonymField);
    }
}

UpIKTokenizerFactory.java

public class UpIkTokenizerFactory extends AbstractTokenizerFactory {

    public static final Logger logger = Loggers.getLogger(UpIkTokenizerFactory.class);
    
  private Configuration configuration;
  public UpIkTokenizerFactory(IndexSettings indexSettings, Environment env, String name, Settings settings) {
      super(indexSettings, name, settings);
      configuration=new Configuration(env,settings);
      
      
//    //从es配置文件中获取mysql信息
     
      Settings s = indexSettings.getSettings();

      boolean autoReloadDic= s.getAsBoolean("autoLoadIkDic", false);
      if(autoReloadDic && StringUtils.isBlank(DBHelper.url) &&StringUtils.isNotBlank(dbUrl)){
          String dbUser = s.get("dbUser");
          String dbPwd = s.get("dbPwd");
          Integer flushTime = s.getAsInt("flushTime", 60);
          String dbTable = s.get("dString dbUrl = s.get(\"dbUrl\");bTable");
          DBHelper.dbTable=dbTable;
          DBHelper.dbName=dbUser;
          DBHelper.dbPwd=dbPwd;
          DBHelper.url=dbUrl;
          logger.warn("dbUrl=========={}",dbUrl);
          String extField = s.get("extField");
          String stopField = s.get("stopField");
          logger.warn("extField=========={}",extField);
          logger.warn("stopField=========={}",stopField);
          ScheduledExecutorService  scheduledExecutorService  =  Executors.newSingleThreadScheduledExecutor();
          scheduledExecutorService.scheduleAtFixedRate(new DBRunnable(extField,stopField), 0, flushTime, TimeUnit.SECONDS);
      }
    
  }

  public static UpIkTokenizerFactory getIkTokenizerFactory(IndexSettings indexSettings, Environment env, String name, Settings settings) {
      return new UpIkTokenizerFactory(indexSettings,env, name, settings).setSmart(false);
  }

  public static UpIkTokenizerFactory getIkSmartTokenizerFactory(IndexSettings indexSettings, Environment env, String name, Settings settings) {
      return new UpIkTokenizerFactory(indexSettings,env, name, settings).setSmart(true);
  }

  public UpIkTokenizerFactory setSmart(boolean smart){
        this.configuration.setUseSmart(smart);
        return this;
  }

  @Override
  public Tokenizer create() {
      return new IKTokenizer(configuration);  }
}

DBRunnable.java

public class DBRunnable implements Runnable {
    Logger logger = Loggers.getLogger(DBRunnable.class);
    private String extField;
    private String stopField;
    
    
    public DBRunnable(String extField, String stopField) {
        super();
        this.extField = extField;
        this.stopField = stopField;
    }


    @Override
    public void run() {
        logger.warn("开始加载词库========");
        //获取词库
        Dictionary dic = Dictionary.getSingleton();
        DBHelper dbHelper = new DBHelper();
        try {
            String extWords = dbHelper.getKey(extField, true);
//          String stopWords = dbHelper.getKey(stopField, true);
            List<String>extList = Arrays.asList(extWords.split(","));
//          List<String>stopList = Arrays.asList(stopWords.split(","));
            logger.warn("extWords为==={}",extWords);
            
            //把扩展词加载到主词库中
            dic.addWords(extList);
            logger.warn("加载扩展词成功========");
//          dic.addStopWords(stopList);
        } catch (Exception e) {
            
            logger.warn("加载扩展词失败========{}",e);
        }   
    }
}

DynamicSynonymTokenFilter.java

public class DynamicSynonymTokenFilter extends AbstractTokenFilterFactory implements Runnable{

    public  static SynonymMap synonymMap;
    private final boolean ignoreCase;
    private Analyzer analyzer;
    private String synonymField;
    
    public static boolean isStarted=false;
    public DynamicSynonymTokenFilter(IndexSettings indexSettings, String name, Settings settings) {
        super(indexSettings, name, settings);
        ignoreCase = true;

//        boolean expand = settings.getAsBoolean("expand", true);
//        String tokenizerName = settings.get("tokenizer", "whitespace");
        this.analyzer = new Analyzer() {
            @Override
            protected TokenStreamComponents createComponents(String fieldName) {
                Tokenizer tokenizer = new WhitespaceTokenizer() ;
                TokenStream stream = ignoreCase ? new LowerCaseFilter(tokenizer) : tokenizer;
                return new TokenStreamComponents(tokenizer, stream);
            }
        };

        try {
              Settings s = indexSettings.getSettings();
              String dbUrl = s.get("dbUrl");
//            boolean autoReloadDic= s.getAsBoolean("autoLoadIkDic", false);
//            if(autoReloadDic && StringUtils.isBlank(DBHelper.url) &&StringUtils.isNotBlank(dbUrl)){
                  String dbUser = s.get("dbUser");
                  String dbPwd = s.get("dbPwd");
                  Integer flushTime = s.getAsInt("flushTime", 60);
                  String dbTable = s.get("dbTable");
                  DBHelper.dbTable=dbTable;
                  DBHelper.dbName=dbUser;
                  DBHelper.dbPwd=dbPwd;
                  DBHelper.url=dbUrl;
                 
                  this.synonymField = s.get("synonymField", "synonym");
                 
                  if(synonymField==null){
                      synonymField="synonym";
                  }
//                getSynonym(synonymField);
                  
                  logger.warn("synonymField同义词字段为=========={}",synonymField);
                  String synoyms = getSynonym(synonymField);
                   loadSyn(synoyms);
                  if(!isStarted){
                      ScheduledExecutorService  scheduledExecutorService  =  Executors.newSingleThreadScheduledExecutor();
                      scheduledExecutorService.scheduleAtFixedRate(this, 0, flushTime, TimeUnit.SECONDS);
                      isStarted = true;
                  }
                  
                  
//            }
            
            
         
        } catch (Exception e) {
            throw new IllegalArgumentException("failed to build synonyms", e);
        }
    }

    
    
    
    
    //获取从mysql中同义词
    private String getSynonym(String synonymField2) {
        DBHelper dbHelper = new DBHelper();
        try {
            return dbHelper.getKey(synonymField2, false, true);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "";
    }





    public static DynamicSynonymTokenFilter getIkFilterFactory(IndexSettings indexSettings, Environment env, String name, Settings settings) {
        return new DynamicSynonymTokenFilter(indexSettings, name, settings);
    }

    
    
    
    
    @Override
    public TokenStream create(TokenStream tokenStream) {
        // fst is null means no synonyms
        return synonymMap.fst == null ? tokenStream : new SynonymFilter(tokenStream, synonymMap, ignoreCase);
    }




    
    
    @Override
    public void run() {
        
        //从mysql中获取同义词词汇
        
        String synoyms = getSynonym(synonymField);
//      String realSynoyms = synoyms.replace("\n", "#");
        //把词汇加载到synonymMap
          logger.warn("加载同义词=========={}",synoyms);
           loadSyn(synoyms);

        
    }





    private void loadSyn(String synoyms) {
        SynonymMap.Builder parser = null;
           parser = new SolrSynonymParser(true, true, this.analyzer);
           try {
            ((SolrSynonymParser) parser).parse(new StringReader(synoyms));
              synonymMap = parser.build();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

PS:这个是IK分词器的扩展,扩展点主要有两个:1.支持mysql动态扩展词 2.支持同义词。mysql动态扩展主要实现是定时去mysql增量加载最新的词库,最后调用 Dictionary.addWords方法更新词库。同义词实现则是利用了es支持的同义词 token filter,重写plugin getTokenFilters方法,构造出一个Token Fiilter Factory,继承AbstractTokenFilterFactory类即可。

聊聊es集群关键理论术语和理解

1.master和slave:es集群会在所有nodes中选举一个node作为master,其他节点为slave。master和slave节点都提供读和写的操作。当主节点挂了,集群会在剩下的节点里重新选举一个新的节点作为主节点。如果是slave节点挂了,如果开启了replicate,其他机器的备份分片会升级为主分片,并且不会有数据丢失。不过会注意会有一小段时间处于yelllow状态。
2.shard主分片:一个索引会建立多个主分片,尽可能平均分配到每个节点。索引数据会被分配到各个主分片上(算法:hash(doc._id)%主分片数)。每个分片是独立的,对弈一个Search Request行为每个分片都会执行这个Request。
3.replica复制分片:一个主分片有多个复制分片,主分片和复制分片不会出现在一台机器,这样做的目的就是防止单点故障。如果你只有一个节点,复制分片就无法分配(unassigned),此时cluster status会变成Yellow。复制分片意义三个 ①容灾:主分片丢失,replica分片就会顶上去成功主分片,同时会跟这个新的主分片创建对应的replica分片,集群数据安然无恙 ②提供查询性能:replica和primary分片的数据是相同的,所以对于一个query既可以查主分片也可以查备分片。
ps:index request只能发生在主分片上,replica不能执行index request。Elasticsearch 采用Push Replication模式,当你往一个主分片索引数据时,该主分片会复制文档到剩下的所有replica分片中(传输原始文件),然后这些分片也会索引这个数据。

上一篇 下一篇

猜你喜欢

热点阅读