Featured image of post Elasticsearch 进阶

Elasticsearch 进阶

Elasticsearch 进阶

数据聚合

聚合: 对文档数据的统计、分析、运算。

  • bucket 桶聚合: 用来对文档分组
    • termAggregation: 按文档字段值分组
    • Date Histogram: 按日期阶梯分组,例如一周一组,一月一组
  • metric 度量聚合: 用来计算一些值
    • avg
    • max
    • min
    • stats: 同时求max, min, avg, sum…
  • pipeline 管道聚合: 其他聚合的结果为基础做聚合

聚合必须的三要素: 聚合名称、聚合类型、聚合字段

聚合可配置的属性: size, order. field

DSL 实现 bucket 聚合

比如统计所有数据中的酒店品牌有几种

GET /hotel/_search
{
  "size": 0, // 设置size为0, 结果中不包含文档, 只包含聚合结果
  "aggs": { // 聚合查询
    "brandAgg": { //品牌聚合查询
      "terms": { // 桶聚合类型, 按照品牌字段聚合, 所以选择term
        "field": "brand", // 桶聚合的字段
          
        "order": {
            "_count": "asc" // 安装_count升序排列
        },
          
        "size": 20 // 桶聚合的最大桶数量
      }
    }
  }
}

限定聚合范围

GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 // 只对200元以下的文档聚合
      }
    }
  },
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

DSL 实现 Metrics 聚合

比如获取每个品牌的用户评分的min, max, avg

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        "order": {
         	"scoreAgg.avg": "desc"
        }
      },
      "aggs": { // 在brands聚合的子聚合,也就是分组后的每个分组进行统计
        "scoreAgg": { // 聚合名称
          "stats": { // 聚合类型,这里stats可以计算min, max, avg等
            "field": "score" // 聚合字段, 这里是score
          }
        }
      }
    }
  }
}

RestAPI实现聚合

品牌聚合(参照bucket)

SearchRequest request = new SearchRequest("hotel");

request.source().size(0);
request.source().aggregation(
    AggregationBuilders
        .terms("brandAgg")
        .field("brand")
        .size(20)
);

SearchResponse response = client.search(request, RequestOptions.DEFAULT);

获取结果

// 解析聚合结果
Aggregations aggregations = response.getAggregations();

// 根据名称获取桶聚合结果
Terms brandTerms = aggregations.get("brand_agg");

// 获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();

// 遍历
for (Terms.Bucket bucket : buckets) {
    // 获取key, 也就是品牌名称
    String brandName = bucket.getKeyAsString();
    System.out.println(brandName);
}

RestAPI 多条件聚合

SearchRequest request = new SearchRequest("hotel");

request.source().size(0);
request.source().aggregation(
    AggregationBuilders
        .terms("brandAgg")
        .field("brand")
        .size(20)
);
request.source().aggregation(
    AggregationBuilders
        .terms("cityAgg")
        .field("city")
        .size(20)
);
request.source().aggregation(
    AggregationBuilders
        .terms("starAgg")
        .field("starName")
        .size(20)
);

SearchResponse response = client.search(request, RequestOptions.DEFAULT);

获取结果

// 解析聚合结果
Map<String, List<String>> result = new HashMap<>();
Aggregations aggregations = response.getAggregations();

// 根据名称获取桶聚合结果
Terms brandTerms = aggregations.get("brandAgg");

// 获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();

// 遍历
// aggregation里分别获取brandAgg, cityAgg, starAgg得到三个结果集,再从三个结果集里取结果到brandList里。这里只展示获取一个。
List<String> brandList = new ArrayList<>();

for (Terms.Bucket bucket : buckets) {
    // 获取key
    String key = bucket.getKeyAsString();
    brandList.add(key);
}

带过滤条件的聚合

对接前端接口

前端页面会向服务端发发起请求,查询品牌、城市、星级等字段的聚合结果:

聚合基于用户选择的条件,必须在这个搜索条件基础上完成。

为了实现动态过滤项 (用户点击前端自动刷新结果),我们需要比照上文写一个新的方法。区别在于给buildBasicQuery加入一个RequestParam类型的参数。实际上用户点击过滤条件后就自动完成了一次新的查询聚合。

SearchRequest request = new SearchRequest("hotel");
buildBasicQuery(params, request);

自动补全

拼音分词器

elasticsearch-analysis-pinyin 插件

使用这个插件会默认将一句话分成以下的token:

  1. 每个单子的全拼
  2. 这句话每个拼音首字母组成的一串文字
  3. 更多参数参见官方文档

自定义分词器

分词器 analyzer 的组成部分分成三部分:

  • character filters: 在tokenizer之前对文本进行处理。比如删除字符、替换字符
  • tokenizer: 将文本按照一定的规则切割成词条 (term)。例如keyword, 就是不分词;还有ik_smart
  • tokenizer filter: 将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等等

image-20240217182444243

可以在创建索引库时,通过settings来配置自定义的analyzer

自动补全查询

PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { // 自定义分词器
        "my_analyzer": { // 分词器名称
           // 可以不定义也可以定义character filter
          "tokenizer": "ik_max_word",
          "filter": "pinyin"
        }
      }
    }
  },
  // 使用
  mappings: {
      properties: {
          "name": {
              "type": "text",
              "analyzer": "my_analyzer",
              "search_analyzer": "ik_smart"
          }
      }
  }
}

由于多音字的存在,自定义分词器适合在创建倒排索引时使用,但不推荐在搜索时使用。

DSL实现自动补全查询

elasticsearch提供了Completion suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型
  • 字段的内容一般是用来补全的多个词条形成的数组
    • 比如 “title”: [“Sony”, “WH-1000XM5”]
// 创建索引
PUT test
{
  "mappings": {
    "properties": {
      "title": {
        "type": "completion"
      }
    }
  }
}

查询语法

// 自动补全查询
GET /test/_search
{
  "suggest": {
    "title_suggest": { // 自己起名
      "text": "s", // 关键字
      "completion": {
        "field": "title", // 补全查询的字段
        "skip_duplicates": true, // 跳过重复的
        "size": 10 // 获取前10条结果
      }
    }
  }
}

推荐的做法:

在创建索引时多加一个suggestion字段

PUT /hotel
{
  "settings": {
    "analysis": {
      "text_analyzer": {
        "tokenizer": "ik_max_word",
        "filter": "py"
      },
      "completion_analyzer": {
        "tokenizer": "keyword", // 这里是拆分后的数组,不需要更细分了
        "filter": "py"
      }
    }
  },
  "mappings": {
    ...
    "all": {
      "type": "text",
      "analyzer": "text_analyzer",
      "search_analyzer": "ik_smart"
    },
    "suggestion": {
      "type": "completion",
      "analyzer": "completion_analyzer"
    }
  }
}

在实体类里将一些信息加入到这个suggestion中

public HotelDoc(Hotel hotel){
    ...
    this.suggestion = Arrays.asList(this.brand, this.business);
    // 有必要时对成员变量的值进行切割,比如按'/' 或者 ' '
}

RestAPI实现自动补全

// 1.创建请求
SearchRequest request = new SearchRequest("hotel");

// 2.准备建议
request.source()
    .suggest(new SuggestBuilder().addSuggestion(
        "mySuggestion",
        SuggestBuilders
            .completionSuggestion("title")
            .prefix("H")
            .skipDuplicates(true)
            .size(10)
    ));

// 3.发送请求
client.search(request, RequestOptions.DEFAULT);

处理结果

// 4.处理结果
Suggest suggest = response.getSuggest();

// 4.1. 根据名称获取补全建议
CompletionSuggestion suggestion = suggest.getSuggestion("mySuggestion");

// 4.2. 获取options并遍历
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
    // 4.3. 获取一个option的text,也就是补全的词
    String text = option.getText().string();
    System.out.println(text);
}

数据同步

数据同步问题分析

MySQL数据发生改变时,elasticsearch也必须跟着变。

微服务中,操作mysql的业务和操作elasticsearch的业务可能在两个不同的微服务上。

同步方案:

  • 同步调用

  • image-20240217192505458

  • 异步通知

  • image-20240217192735522

  • 监听 binlog

  • image-20240217192829410

使用MQ完成数据同步

例子: 当酒店数据发生增删改时、要求对es中的数据也要完成相同的操作

声明exchange, queue, RoutingKey

image-20240217193427524

@Configuration
public class MqConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE, durable: true, autoDelete: false);
    }

    @Bean
    public Queue insertQueue() {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE, durable: true);
    }

    @Bean
    public Queue deleteQueue() {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE, durable: true);
    }

    @Bean
    public Binding insertQueueBinding() {
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    
    @Bean
    public Binding deleteQueueBinding() {
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

这里的常量比如 HOTEL_EXCHANGE 是 hotel.topic 注意看上图

消息发送 (监听到MySQL发生变动时)

更改业务层

public class HotelController {
    ...
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostMapping
    pubilc void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId); // 因为mq占内存,发的消息尽量小一些
    }
    
   	// 更新也发到insert queue里,删除发到delete queue里
    
}

消息接收 (ES)

@Component
public class HotelListener {
    
    @Autowired
    private IHotelService hotelService;
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id){
		hotelService.insertById(id);
    }

    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id){
        hotelService.deleteById(id);
    }
    
}
@Override
public void deleteById(Long id){
    // 1.准备Request
    DeleteRequest request = new DeleteRequest("hotel",id);
    // 2.准备发送请求
    client.delete(request,RequestOptions.DEFAULT);
}
@Override
public void insertById(Long id){
    // 0.根据id查询酒店数据
    Hotel hotel = getById(id);
    // 转换为文档类型
    HotelDoc hotelDoc = new HotelDoc(hotel);
    // 1.准备Request
    IndexRequest request = new IndexRequest("Hotel").id(hotel.getId()).toString());
    // 2.准备Json文档
    request.source(JSON.toJSONString(hotelDoc),XContentType.JSON);
    // 3.准备发送请求
    client.index(request, RequestOptions.DEFAULT);
}

集群

单机的elasticsearch做数据存储会面临两个问题:

  • 海量数据存储问题
  • 单点故障问题

解决:

  • 海量数据存储问题: 将索引库从逻辑上划分为n个分片(shard),存储到多个节点
  • 单点故障问题: 将分片数据在不同节点备份(replica)
  • image-20240217200439522

集群创建索引库

kibana的DevTools中创建索引库

PUT /[name]

{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping属性定义 ...
    }
  }
}

ES集群的节点角色

节点类型 配置参数 配置值 功能描述
master eligible node.master true 负责集群管理:主节点可以管理和决策集群状态、决定分片在哪个节点、处理集群范围内索引分配的请求
data node.data true 数据节点:存储数据、搜索、聚合、CRUD
ingest node.ingest true 数据预处理节点之前的预处理
coordinating 上面3个参数都为false则为coordinating节点 协调节点专门处理查询,返回结果并发送给客户端,不做具体的索引和数据处理

默认每个节点都能同时负责4个职责

部署时可通过环境变量改变自己的角色

image-20240217201619698

ES集群的脑裂

默认情况下,每个节点都是 master eligible 节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节 点与其他节点网络故障时,可能发生脑裂问题。

问题: 主节点的网络阻塞了,集群中选举出了一个新的主节点。但是之前的网络恢复了,此时选举集群中同时有两个主节点。

解决: 要求选票超过(eligible节点数量+1)/2才能当选为master,因此elligible节点数量最好是奇数。

es7.0后一般不会发生脑裂问题

分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡,那么 coordinating node如何确定数据该存储到哪个分片昵?

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

shard = hash(_routing) % number_of_shards

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改
新增文档流程

image-20240217203308642

分布式查询

两个阶段

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:收集阶段,coordinating node汇总data node的处理结果,然后把汇总结果返回给用户

image-20240217203416384

故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。

image-20240217203921048