Elasticsearch 进阶
数据聚合
聚合: 对文档数据的统计、分析、运算。
- bucket 桶聚合: 用来对文档分组
- termAggregation: 按文档字段值分组
- Date Histogram: 按日期阶梯分组,例如一周一组,一月一组
- metric 度量聚合: 用来计算一些值
- avg
- max
- min
- stats: 同时求max, min, avg, sum…
- pipeline 管道聚合: 其他聚合的结果为基础做聚合
聚合必须的三要素: 聚合名称、聚合类型、聚合字段
聚合可配置的属性: size, order. field
DSL 实现 bucket 聚合
比如统计所有数据中的酒店品牌有几种
GET /hotel/_search
{
"size": 0, // 设置size为0, 结果中不包含文档, 只包含聚合结果
"aggs": { // 聚合查询
"brandAgg": { //品牌聚合查询
"terms": { // 桶聚合类型, 按照品牌字段聚合, 所以选择term
"field": "brand", // 桶聚合的字段
"order": {
"_count": "asc" // 安装_count升序排列
},
"size": 20 // 桶聚合的最大桶数量
}
}
}
}
限定聚合范围
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 // 只对200元以下的文档聚合
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}
DSL 实现 Metrics 聚合
比如获取每个品牌的用户评分的min, max, avg
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scoreAgg.avg": "desc"
}
},
"aggs": { // 在brands聚合的子聚合,也就是分组后的每个分组进行统计
"scoreAgg": { // 聚合名称
"stats": { // 聚合类型,这里stats可以计算min, max, avg等
"field": "score" // 聚合字段, 这里是score
}
}
}
}
}
}
RestAPI实现聚合
品牌聚合(参照bucket)
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().aggregation(
AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(20)
);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
获取结果
// 解析聚合结果
Aggregations aggregations = response.getAggregations();
// 根据名称获取桶聚合结果
Terms brandTerms = aggregations.get("brand_agg");
// 获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 遍历
for (Terms.Bucket bucket : buckets) {
// 获取key, 也就是品牌名称
String brandName = bucket.getKeyAsString();
System.out.println(brandName);
}
RestAPI 多条件聚合
SearchRequest request = new SearchRequest("hotel");
request.source().size(0);
request.source().aggregation(
AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(20)
);
request.source().aggregation(
AggregationBuilders
.terms("cityAgg")
.field("city")
.size(20)
);
request.source().aggregation(
AggregationBuilders
.terms("starAgg")
.field("starName")
.size(20)
);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
获取结果
// 解析聚合结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();
// 根据名称获取桶聚合结果
Terms brandTerms = aggregations.get("brandAgg");
// 获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 遍历
// aggregation里分别获取brandAgg, cityAgg, starAgg得到三个结果集,再从三个结果集里取结果到brandList里。这里只展示获取一个。
List<String> brandList = new ArrayList<>();
for (Terms.Bucket bucket : buckets) {
// 获取key
String key = bucket.getKeyAsString();
brandList.add(key);
}
带过滤条件的聚合
对接前端接口
前端页面会向服务端发发起请求,查询品牌、城市、星级等字段的聚合结果:
聚合基于用户选择的条件,必须在这个搜索条件基础上完成。
为了实现动态过滤项 (用户点击前端自动刷新结果),我们需要比照上文写一个新的方法。区别在于给buildBasicQuery加入一个RequestParam类型的参数。实际上用户点击过滤条件后就自动完成了一次新的查询聚合。
SearchRequest request = new SearchRequest("hotel");
buildBasicQuery(params, request);
自动补全
拼音分词器
elasticsearch-analysis-pinyin 插件
使用这个插件会默认将一句话分成以下的token:
- 每个单子的全拼
- 这句话每个拼音首字母组成的一串文字
- 更多参数参见官方文档
自定义分词器
分词器 analyzer 的组成部分分成三部分:
- character filters: 在tokenizer之前对文本进行处理。比如删除字符、替换字符
- tokenizer: 将文本按照一定的规则切割成词条 (term)。例如keyword, 就是不分词;还有ik_smart
- tokenizer filter: 将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等等
可以在创建索引库时,通过settings来配置自定义的analyzer
自动补全查询
PUT /test
{
"settings": {
"analysis": {
"analyzer": { // 自定义分词器
"my_analyzer": { // 分词器名称
// 可以不定义也可以定义character filter
"tokenizer": "ik_max_word",
"filter": "pinyin"
}
}
}
},
// 使用
mappings: {
properties: {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
由于多音字的存在,自定义分词器适合在创建倒排索引时使用,但不推荐在搜索时使用。
DSL实现自动补全查询
elasticsearch提供了Completion suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须是completion类型
- 字段的内容一般是用来补全的多个词条形成的数组
- 比如 “title”: [“Sony”, “WH-1000XM5”]
// 创建索引
PUT test
{
"mappings": {
"properties": {
"title": {
"type": "completion"
}
}
}
}
查询语法
// 自动补全查询
GET /test/_search
{
"suggest": {
"title_suggest": { // 自己起名
"text": "s", // 关键字
"completion": {
"field": "title", // 补全查询的字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
推荐的做法:
在创建索引时多加一个suggestion字段
PUT /hotel
{
"settings": {
"analysis": {
"text_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword", // 这里是拆分后的数组,不需要更细分了
"filter": "py"
}
}
},
"mappings": {
...
"all": {
"type": "text",
"analyzer": "text_analyzer",
"search_analyzer": "ik_smart"
},
"suggestion": {
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
在实体类里将一些信息加入到这个suggestion中
public HotelDoc(Hotel hotel){
...
this.suggestion = Arrays.asList(this.brand, this.business);
// 有必要时对成员变量的值进行切割,比如按'/' 或者 ' '
}
RestAPI实现自动补全
// 1.创建请求
SearchRequest request = new SearchRequest("hotel");
// 2.准备建议
request.source()
.suggest(new SuggestBuilder().addSuggestion(
"mySuggestion",
SuggestBuilders
.completionSuggestion("title")
.prefix("H")
.skipDuplicates(true)
.size(10)
));
// 3.发送请求
client.search(request, RequestOptions.DEFAULT);
处理结果
// 4.处理结果
Suggest suggest = response.getSuggest();
// 4.1. 根据名称获取补全建议
CompletionSuggestion suggestion = suggest.getSuggestion("mySuggestion");
// 4.2. 获取options并遍历
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
// 4.3. 获取一个option的text,也就是补全的词
String text = option.getText().string();
System.out.println(text);
}
数据同步
数据同步问题分析
MySQL数据发生改变时,elasticsearch也必须跟着变。
微服务中,操作mysql的业务和操作elasticsearch的业务可能在两个不同的微服务上。
同步方案:
-
同步调用
-
异步通知
-
监听 binlog
使用MQ完成数据同步
例子: 当酒店数据发生增删改时、要求对es中的数据也要完成相同的操作
声明exchange, queue, RoutingKey
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.HOTEL_EXCHANGE, durable: true, autoDelete: false);
}
@Bean
public Queue insertQueue() {
return new Queue(MqConstants.HOTEL_INSERT_QUEUE, durable: true);
}
@Bean
public Queue deleteQueue() {
return new Queue(MqConstants.HOTEL_DELETE_QUEUE, durable: true);
}
@Bean
public Binding insertQueueBinding() {
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding() {
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
这里的常量比如 HOTEL_EXCHANGE 是 hotel.topic 注意看上图
消息发送 (监听到MySQL发生变动时)
更改业务层
public class HotelController {
...
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
pubilc void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId); // 因为mq占内存,发的消息尽量小一些
}
// 更新也发到insert queue里,删除发到delete queue里
}
消息接收 (ES)
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
@Override
public void deleteById(Long id){
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel",id);
// 2.准备发送请求
client.delete(request,RequestOptions.DEFAULT);
}
@Override
public void insertById(Long id){
// 0.根据id查询酒店数据
Hotel hotel = getById(id);
// 转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备Request
IndexRequest request = new IndexRequest("Hotel").id(hotel.getId()).toString());
// 2.准备Json文档
request.source(JSON.toJSONString(hotelDoc),XContentType.JSON);
// 3.准备发送请求
client.index(request, RequestOptions.DEFAULT);
}
集群
单机的elasticsearch做数据存储会面临两个问题:
- 海量数据存储问题
- 单点故障问题
解决:
- 海量数据存储问题: 将索引库从逻辑上划分为n个分片(shard),存储到多个节点
- 单点故障问题: 将分片数据在不同节点备份(replica)
集群创建索引库
kibana的DevTools中创建索引库
PUT /[name]
{
"settings": {
"number_of_shards": 3, // 分片数量
"number_of_replicas": 1 // 副本数量
},
"mappings": {
"properties": {
// mapping属性定义 ...
}
}
}
ES集群的节点角色
节点类型 | 配置参数 | 配置值 | 功能描述 |
---|---|---|---|
master eligible | node.master | true | 负责集群管理:主节点可以管理和决策集群状态、决定分片在哪个节点、处理集群范围内索引分配的请求 |
data | node.data | true | 数据节点:存储数据、搜索、聚合、CRUD |
ingest | node.ingest | true | 数据预处理节点之前的预处理 |
coordinating | 上面3个参数都为false则为coordinating节点 | 无 | 协调节点专门处理查询,返回结果并发送给客户端,不做具体的索引和数据处理 |
默认每个节点都能同时负责4个职责
部署时可通过环境变量改变自己的角色
ES集群的脑裂
默认情况下,每个节点都是 master eligible 节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节 点与其他节点网络故障时,可能发生脑裂问题。
问题: 主节点的网络阻塞了,集群中选举出了一个新的主节点。但是之前的网络恢复了,此时选举集群中同时有两个主节点。
解决: 要求选票超过(eligible节点数量+1)/2才能当选为master,因此elligible节点数量最好是奇数。
es7.0后一般不会发生脑裂问题
分布式存储
当新增文档时,应该保存到不同分片,保证数据均衡,那么 coordinating node如何确定数据该存储到哪个分片昵?
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
shard = hash(_routing) % number_of_shards
- _routing默认是文档的id
- 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改
新增文档流程
分布式查询
两个阶段
- scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
- gather phase:收集阶段,coordinating node汇总data node的处理结果,然后把汇总结果返回给用户
故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。