一、ElasticSearch核心概念
1、Cluster
代表集群的主节点,是通过选举产生的。你与任何一个节点通信与整个ES集群通信是等价的。
职责:负责管理集群的状态,包括管理分片的状态和副本的状态,以及节点的发现和删除
只需要在同一个网段之内你启动多个ES节点,就可以组成一个集群,默认ES会自动发现同一网段内的节点,组成集群
集群的状态的查看:
http://qyl01:9200/_cluser/health?pretty
集群的状态:
YELLOW:主分片可用,但不是所有副本分片都可用
RED:不是所有的分片都可用
GREEN:所有的主分片和副分片都可用
2、Shards
代表索引分片,ES可以把一个完整的索引分成多个分片,这样的好处是可以把一个大的索引分成多个,分布在不同的节点上,构成分布式搜索。
可以在创建索引库的时候指定:
curl -XPUT 'localhost:9200/test1/' -d '{"settings":{"number_of_shards":3}}'
默认是一个索引库有5个分片 index.number_of_shards:5
3、replicas
代表索引副本,ES可以给索引设置副本,副本的作用:
1、是提高系统的容错性,当某个节点某个分片损坏或丢失是可以从副本中恢复
2、提供ES的查询效率,ES会自动对搜索请求进行负载均衡
可以在创建索引库的时候指定:
curl -XPUT 'http//qyl01:9200/test2/'
-d'{"settings":{"number_of_replicas":2}}'
默认是一个分片有1个副本 index.number_of_replicas:1
4、recovery
代表数据恢复或者叫数据重新分布,ES在有节点加入或者退出时会根据机器的负载对索引分片进行重新分配,挂掉的节点重新启动也会进行数据恢复。
5、gateway
代表ES索引的持久化存储方式,ES默认是先把索引放到内存中,当内存满了时再持久化到硬盘。当这个ES集群关闭再重新启动时就会从gateway中读取索引数据。ES支持多种类型的gateway,也有本地文件系统(默认),分布式文件系统,Hadoop的HDFS和amazon的s3云存储服务。
6、discovery.zen
代表ES的自动发现节点机制,ES是一个基于p2p的系统,它先通过寻找存在的节点,再通过多播协议来进行节点之间的通信,同时也支持点对点的交互。
**如果是不同网段的节点如果组成ES集群
禁用自动发现机制
discovery.zen.ping.multicast.enabled:false
设置新节点被启动时能够发现的注解列表
Discovery.zen.ping.unicast.hosts:["192.8.50.150", "192.8.53.124:9300"]
7、Transport
代表ES内部节点或集群与客户端的交互方式,默认内部使用tcp协议进行交互,同时它支持http协议(json格式)、thrift、servlet、memcached、zeroMQ等传输协议(通过插件方式集成)
二、ElasticSearch JavaAPI
1、准备maven环境(es版本6.2.0,可以根据自己的版本进行修改)
添加maven依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.dom4j</groupId>
<artifactId>dom4j</artifactId>
<version>2.0.0</version>
</dependency>
2、连接ES
private var client:TransportClient=null
private val index="qyl" //指定索引
private val types="product" //指定类型
// 获取连接入口 client
@Before def setUp():Unit={
val settings=Settings.builder()
.put("cluster.name","qyl-1807") //指定集群名
.build()
client=new PreBuiltTransportClient(settings)
val ta=new TransportAddress(InetAddress.getByName("qyl01"),9300)
client.addTransportAddress(ta)
}
3、获取数据(get)
// 获取指定索引和 类型 id 的json数据
@Test def testget():Unit={
val response = client.prepareGet(index,types,"1").get()
println("version:"+response.getVersion) //获取版本
println("source:"+response.getSource) //获取内容
}
4、向ES中添加索引信息
1)以json格式:
// 以json格式向索引为Index,类型为types中添加id=50的数据
@Test def testIndexJSON(): Unit = {
val source = "{\"name\": \"sqoop\", \"version\": \"1.4.7\", \"author\": \"apache\"}"
val response = client.prepareIndex(index, types, "50").setSource(source, XContentType.JSON).get()
println("version: " + response.getVersion)
}
2)以map格式:
/ 以Map格式向索引为Index,类型为types中添加id=52的数据
@Test def testIndexMap(): Unit = {
var sourceMap = new util.HashMap[String,Object]()
sourceMap.put("name","qyl")
sourceMap.put("version","99.99.99")
val response = client.prepareIndex(index, types, "52").setSource(sourceMap).get()
println("version: " + response.getVersion)
}
3)以XContentBuilder格式:(最终)
// 以xcontentBuilder格式
@Test def testIndexXContentBuilder(): Unit = {
val xContentBuilder = JsonXContent.contentBuilder()
xContentBuilder.startObject()//{ 必需
.field("name", "azkaban")
.field("version", "1.0.0")
.field("author", "Apache")
.endObject()//} 必需
val response = client.prepareIndex(index, types, "53").setSource(xContentBuilder).get()
println("version: " + response.getVersion)
}
5、更新数据 (prepareIndex和prepareUpdate)
前面prepareIndex除了新增,还有如果该id存在,会覆盖原先的数据
@Test def testUpdate(): Unit = {
val doc = "{\"version\": \"3.1.0\"}"
val response = client.prepareUpdate(index, `type`, "1").setDoc(doc, XContentType.JSON).get()
println("version: " + response.getVersion)
}
6、查询
1)全文索引的查询,必须指定一个SearchType
query and fetch(速度最快)(返回N倍数据量)
比如返回M条记录,检索所有的(N个)分片,并从每一个分片中返回M条记录,N*M
query then fetch (默认的搜索方式)
默认的搜索方式,从所有的分片中进行检索并返回想要的内容,同时进行排序和排名,然后找到关联度最高的分片,从这些分片中返回想要的size条记录
DFS query then fetch (可以更精确控制索引打分和排名)
在query then fetch的基础之上,将索引关键字拆分成若干个小的关键字,从索引库中计算各个词频,同时触发的score计算,以便找到最贴合的数据信息。
@Test def testQuery(): Unit = {
val response = client.prepareSearch(indices: _*)
.setSearchType(SearchType.DEFAULT) //设置检索类型
//第一个参数:要在哪一个字段进行查询,第二个参数指定是要查询什么内容
/**
* select
* xxx
* from t
* where name like '%text%'
*/
// .setQuery(QueryBuilders.matchQuery("name", "kafka")) //设置检索方式 全文检索的过程中不区分大小写
.setQuery(QueryBuilders.prefixQuery("name", "a")) // name like 'a%'
.get()//执行,并获取检索内容
val hits = response.getHits //取出检索结果 {"hits": {"total": 3, "maxScore": 1.0, "hits": [{"_index":"product","_type":"bigdata","_id":"5","_score":1.0,"_source":{"author":"LinkedIn","name":"kafka","version":"0.10.0.1"}}]}
val total = hits.totalHits //总共检索到的数据条数(long)
val maxScore = hits.getMaxScore//最大得分
val searchHits = hits.getHits //拿到所有的检索结果
//{"_index":"product","_type":"bigdata","_id":"5","_score":1.0,"_source":{"author":"LinkedIn","name":"kafka","version":"0.10.0.1"}}
for(searchHit <- searchHits) {
val index = searchHit.getIndex
val `type` = searchHit.getType
val id = searchHit.getId
val score = searchHit.getScore
val source = searchHit.getSourceAsString
val jsonObj = new JSONObject();
jsonObj.put("_index", index)
jsonObj.put("_type", `type`)
jsonObj.put("_id", id)
jsonObj.put("_score", score)
jsonObj.put("_source", new JSONObject(source))
println(jsonObj.toString)
}
}
2)分页查询
分页算法,每页显示M条记录,用户想找寻第N页的记录:
* .setFrom((N-1)*M)
.setSize(M)//每页显示的内容
*/
@Test def testQueryPage(): Unit = {
val response = client.prepareSearch(indices: _*)
.setSearchType(SearchType.QUERY_THEN_FETCH)
/*
//当前document中是否包含该值,这个类似于multiMatchQuery
multiMatchQuery在特定的几个字段上进行匹配
而termQuery是在该document中进行匹配,做的精准匹配
*/
// .setQuery(QueryBuilders.matchQuery("gender", "F"))
.setQuery(QueryBuilders.termQuery("gender", "F"))
//分页:
.setFrom(0)
.setSize(5)
.get()
val hits = response.getHits
val totalHits = hits.getTotalHits
println("OLD Li为您找到相关结果约" + totalHits + "个")
val searchHits = hits.getHits
println("--------------开始输出结果:--------------------")
for(searchHit <- searchHits) {
val source = searchHit.getSourceAsMap
val age = source.get("age")
val balance = source.get("balance")
val gender = source.get("gender")
val firstname = source.get("firstname")
println(
s"""
|source: ${source}
|age : ${age}
|balance: ${balance}
|gender: ${gender}
|firstname:${firstname}
""".stripMargin)
println("----------------------------------------")
}
}
3)高亮显示
@Test def testQueryHighLight(): Unit = {
val response = client.prepareSearch(indices: _*)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("address", "Avenue"))
.highlighter(new HighlightBuilder()
.field("address")//给哪个字段设置高亮
.preTags("<font color='red' size='20px'>")//前置标签
.postTags("</font>") //后置标签
)
.setFrom(0)
.setSize(5)
.get()
val hits = response.getHits
val totalHits = hits.getTotalHits
println("OLD Li为您找到相关结果约" + totalHits + "个")
val searchHits = hits.getHits
println("--------------开始输出结果:--------------------")
for(searchHit <- searchHits) {
val source = searchHit.getSourceAsMap
val age = source.get("age")
val balance = source.get("balance")
val gender = source.get("gender")
val firstname = source.get("firstname")
val highlightFields = searchHit.getHighlightFields
var address = ""
for((field, highlight) <- JavaConversions.mapAsScalaMap(highlightFields)) {
val fragments = highlight.getFragments
for (fragment <- fragments) {
address += fragment.toString
}
}
println(
s"""firstname:${firstname}
|age : ${age}
|gender: ${gender}
|balance: ${balance}
|address: ${address}
""".stripMargin)
println("----------------------------------------")
}
}
4)ES的高级聚合操作:
* 比如查看收入在2000到3000之间的男性用户信息,
* 平均收入,最高收入,最低收入等待
*/
@Test def testQueryAggregation(): Unit = {
val response = client.prepareSearch(indices: _*)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("gender", "M"))
/**
*
* 第一个balance 返回值中获取的哪个值
* 第二个balance 是在哪个字段上进行检索
* select
* balance as balance
* from t
*/
.addAggregation(AggregationBuilders.avg("avg_income").field("balance"))
.get()
val aggr = response.getAggregations
for(aggrInfo <- JavaConversions.asScalaBuffer(aggr.asList())) {
val avg = aggrInfo.asInstanceOf[Avg]
println("平均值:" + avg.getValue)
}
}
@Test def testQueryAggregation1(): Unit = {
val response = client.prepareSearch(indices: _*)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setQuery(QueryBuilders.matchQuery("gender", "M"))
//添加多个聚合操作
.addAggregation(AggregationBuilders.range("balance1").addRange(20000, 30000).field("balance"))
.addAggregation(AggregationBuilders.avg("avg_income").field("balance"))
.get()
for(aggregation <- JavaConversions.asScalaBuffer(response.getAggregations.asList())) {
println(aggregation.getClass)
println(aggregation.getName)
println(aggregation.getType)
println(aggregation.getMetaData)
println(aggregation)
println("------------------------------------------")
}
}
@After def cleanUp(): Unit = {
ElasticSearchUtil.close(client);
}
}
5)中文分词
class ElasticSearchTest4 {
private var client:TransportClient = null
private[test] val indices = Array("chinese1")
@Before def setUp(): Unit = {
client = ElasticSearchUtil.getTransportClient;
}
/**
* 在索引库chinese中检索包含”中“的文档
*/
@Test def testQuery(): Unit = {
val response = client.prepareSearch(indices: _*)
.setSearchType(SearchType.DEFAULT) //设置检索类型
/*
matchQuery会将检索关键字进行分词,
将"中国"做了分词:
中
国
termQuery在检索过程中,没有对的关键字进行拆分,按照整体进行查询
英文的分词,天然就是按照空格分隔一个个的单词,这些软件默认对中文的分词就是按照一个个汉字来进行拆分
*/
// .setQuery(QueryBuilders.matchQuery("content", "中国")) // name like 'a%'
.setQuery(QueryBuilders.termQuery("content", "中国"))
.get()//执行,并获取检索内容
val hits = response.getHits //取出检索结果 {"hits": {"total": 3, "maxScore": 1.0, "hits": [{"_index":"product","_type":"bigdata","_id":"5","_score":1.0,"_source":{"author":"LinkedIn","name":"kafka","version":"0.10.0.1"}}]}
val total = hits.totalHits //总共检索到的数据条数(long)
println("OLD Li为您找到相关结果约" + total + "个")
val maxScore = hits.getMaxScore//最大得分
val searchHits = hits.getHits //拿到所有的检索结果
for(searchHit <- searchHits) {
val index = searchHit.getIndex
val `type` = searchHit.getType
val id = searchHit.getId
val score = searchHit.getScore
val source = searchHit.getSourceAsString
val jsonObj = new JSONObject();
jsonObj.put("_index", index)
jsonObj.put("_type", `type`)
jsonObj.put("_id", id)
jsonObj.put("_score", score)
jsonObj.put("_source", new JSONObject(source))
println(jsonObj.toString)
}
}
@After def cleanUp(): Unit = {
ElasticSearchUtil.close(client);
}
}
三、ElasticSearch的优化
1、对于创建过程
1)调大系统的“最大打开文件数”,建议32k甚至是64k
ulimit -a(查看)
ulimit -n 32000(设置)
2)修改配置文件调整ES的JVM内存大小
1:修改bin/elasticsearch.in.sh中ES_MIN_MEM和ES_MAX_MEM的大小,建议设置一样大,避免频繁的分配内存,根据服务器内存大小,一般分配60%左右(默认256M)
2:如果使用searchwrapper插件启动es的话则修改 bin/service/elasticsearch.conf(默认1024M,2.x以后不用考虑)
设置mlockall来锁定进程的物理内存地址
避免交换(swapped)来提高性能
修改文件conf/elasticsearch.yml
bootstrap.mlockall: true
3)分片多的话,可以提升建立索引的能力,5-20个比较合适。
4)要定时对索引进行优化,不然segment越多,查询性能越差
索引量不是很大的情况下可以将segment设置为1
命令:
curl -XPOST
'http://localhost:9200/crxy/_optimize?max_num_segments=1'
java代码:
client.admin().indices().prepareOptimize("bigdata").setMaxNumSegments(1).get();
2、对于删除过程
删除文档:在Lucene中删除文档,数据不会马上在硬盘上除去,而是在lucene索引中产生一个.del的文件,而在检索过程中这部分数据也会参与检索,lucene在检索过程会判断是否删除了,如果删除了再过滤掉。这样也会降低检索效率。所以可以执行清除删除文档。
命令:
curl -XPOST 'http://localhost:9200/bigdata/_optimize?only_expunge_deletes=true'
java代码:
client.admin().indices().prepareOptimize("bigdata").setOnlyExpungeDeletes(true).get();
如果在项目开始的时候需要批量入库大量数据的话,建议将副本数设置为0。因为es在索引数据的时候,如果有副本存在,数据也会马上同步到副本中,这样会对es增加压力。待索引完成后将副本按需要改回来。这样可以提高索引效率。
3、对于配置
去掉mapping中_all域,Index中默认会有_all的域,(相当于solr配置文件中的拷贝字段text),这个会给查询带来方便,但是会增加索引时间和索引尺寸"_all":{"enabled":"false"}
log输出的水平默认为trace,即查询超过500ms即为慢查询,就要打印日志,造成cpu和mem,io负载很高。把log输出水平改为info,可以减轻服务器的压力。
修改ES_HOME/conf/logging.yaml文件
或者修改ES_HOME/conf/elasticsearch.yaml
|
|