「一招制敌」老板再也不用为“搜索不到数据”而操心了

发布时间:2025-05-17 04:41:57 作者:益华网络 来源:undefined 浏览量(1) 点赞(1)
摘要:1. 概览 相信负责过“搜索服务”的伙伴,最害怕的一句话就是:“数据怎么又搜索不出来了!!!”。每当收到这句话,都会心中一颤,因为面对几千万甚至几亿的索引数据,我真的无从下手,不知道业务要搜索什么,也不知道是哪些数据出了问题….1.1. 背景目前,“搜索”已经成为后端管理平台的必备功能,

1. 概览

相信负责过“搜索服务”的伙伴,最害怕的一句话就是:“数据怎么又搜索不出来了!!!”。每当收到这句话,都会心中一颤,因为面对几千万甚至几亿的索引数据,我真的无从下手,不知道业务要搜索什么,也不知道是哪些数据出了问题….

1.1. 背景

目前,“搜索”已经成为后端管理平台的必备功能,在这个业务场景中,很多人都会基于 elasticsearch 强大的检索能力构建自己的搜索服务。但实际开发中,elasticsearch 的引入是非常小的一部分,往往大头是索引模型的数据管理,在整个过程中,我们

需要根据业务需求构建检索模型和ES存储模型;需要从多个数据源中获取数据,并填充到检索模型;需要关注所有数据源的数据变化,并对变更数据进行索引重建;需要对不一致的数据进行识别和处理…

如此繁琐的事情,哪一环出现问题都会收到业务的投诉。

1.2. 目标

对搜索场景中的最佳实践进行封装,从而:

降低开发成本,开发人员将精力放在模型构建上,抛开繁琐的技术细节;对数据索引、关联数据更新有很好的支持;引入数据实时巡检能力,对于数据不一致的情况进行自动修复;引入天级对账机制,保障数据的一致性;

2. 快速入门

2.1. 准备环境

首先,增加对 spring data elasticsearch 的支持,具体 maven 坐标如下:

org.springframework.boot

spring-boot-starter-data-elasticsearch

</dependency>

在 application.yml 中添加 es 的配置信息,具体如下:

spring:

elasticsearch:

uris: http://localhost:9200

connection-timeout: 10s

socket-timeout: 30s

新建 SpringESConfiguration 配置信息,指定 ES Repository 的包信息,居然如下:

@Configuration

@EnableElasticsearchRepositories(basePackages = "com.geekhalo.lego.wide.es")

public class SpringESConfiguration {

}

最后,引入 lego-starter,具体如下:

com.geekhalo.lego

lego-starter

<version>0.1.14-wide-SNAPSHOT</version>

至此,就完成了项目的准备工具,可以着手构建索引模型。

2.2. 构建模型

构造模型之前,需要构建一个 Enum 用以管理模型中所有关联数据,具体如下:

public enum WideOrderType implements WideItemType {

ORDER, // 订单主数据

USER, // 用户数据

ADDRESS, // 用户地址数据

PRODUCT // 购买商品数据

}

WideOrderType 枚举实现 WideItemType 接口,用于与框架进行集成。

接下来,构建待索引的宽表模型,具体如下:

@Data

@NoArgsConstructor

@AllArgsConstructor

@Document(indexName = "wide_order")

public class WideOrder extends BindFromBasedWide {

@org.springframework.data.annotation.Id

private Long id;

@BindFrom(sourceClass = Order.class, field = "userId")

private Long userId;

@BindFrom(sourceClass = Order.class, field = "addressId")

private Long addressId;

@BindFrom(sourceClass = Order.class, field = "productId")

private Long productId;

@BindFrom(sourceClass = Order.class, field = "descr")

private String orderDescr;

@BindFrom(sourceClass = User.class, field = "name")

private String userName;

@BindFrom(sourceClass = Address.class, field = "detail")

private String addressDetail;

@BindFrom(sourceClass = Product.class, field = "name")

private String productName;

@BindFrom(sourceClass = Product.class, field = "price")

private Integer productPrice;

public WideOrder(Long orderId){

setId(orderId);

}

@Override

public Long getId(){

return id;

}

@Override

public boolean isValidate(){

return userId != null && addressId != null && productId != null;

}

@Override

public List getItemsKeyByType(WideOrderType wideOrderType){

switch (wideOrderType){

case ORDER:

return Collections.singletonList(new WideItemKey(wideOrderType, getId()));

case USER:

return Collections.singletonList(new WideItemKey(wideOrderType, getUserId()));

case ADDRESS:

return Collections.singletonList(new WideItemKey(wideOrderType, getAddressId()));

case PRODUCT:

return Collections.singletonList(new WideItemKey(wideOrderType, getProductId()));

}

return Collections.emptyList();

}

}

该模型有如下几个特点:

存在很多属性,是由多个表数据共同组成的“宽表”;除 id 属性外,其他属性上都有 @BindFrom 注解,用于标明该字段的数据是来自于哪个实体的那个字段;继承自 BindFromBasedWide<Long, WideOrderType>,其中 Long 为模型主键,WideOrderType 为刚建的枚举,BindFromBasedWide 将根据字段上的 @BindFrom 注解自动完成 数据更新 和 数据比对;Long getId() 方法返回模型的主键信息;boolean isValidate() 用于对数据的有效性进行验证,无效数据将不会进行持久化处理List<WideItemKey> getItemsKeyByType(WideOrderType wideOrderType) 根据关联数据类型(WideOrderType)返回不同键信息,以进行数据组装;

至此,模型就建立完毕。

2.3. 数据提供器

有了模型后,我们需要构建一些组件用于为“宽表”提供数据,这就是 WideItemDataProvider 体系。

我们以 OrderProvider 为例,具体如下:

@Component

@org.springframework.core.annotation.Order(value = Ordered.HIGHEST_PRECEDENCE)

public class OrderProvider implements WideItemDataProvider {

@Autowired

private OrderDao orderDao;

@Override

public List apply(List key){

return orderDao.findAllById(key);

}

@Override

public WideOrderType getSupportType(){

return WideOrderType.ORDER;

}

}

该类有如下特点:

实现 WideItemDataProvider 接口,其中 WideOrderType 为刚刚定义的枚举,Long 为 Order 模型的关联键类型,Order 为要提供的数据;List<Order> apply(List<Long> key),根据 key 获得对应的数据;WideOrderType getSupportType(),获取该组件所支持的 关联类型;@Component 标记该类为 Spring 的托管 Bean;@Order(value = Ordered.HIGHEST_PRECEDENCE) 指定组件的顺序,由于为 WideOrder 提供主数据,优先级调到最高;

每一类关联数据都会提供自己的数据提供器,简单看下 UserProvider 实现,具体如下:

@Component

public class UserProvider implements WideItemDataProvider {

@Autowired

private UserDao userDao;

@Override

public List apply(List key){

return userDao.findAllById(key);

}

@Override

public WideOrderType getSupportType(){

return WideOrderType.USER;

}

}

和 OrderProvider 没有本质区别,当然,demo 中还提供了多种实现,如:

OrderProvider,提供订单主数据;UserProvider,提供用户信息;AddressProvider,提供用户地址信息;ProductProvider,提供商品信息;2.4. 构建宽表仓库

数据都准备好了,需要将 “宽表” 进行持久化,将其放入最合适的存储引擎,以便更好的处理查询请求。

基于 ElasticsearchRepository 的 WideOrderRepository 具体如下:

@Repository

public class WideOrderRepository implements WideCommandRepository {

@Autowired

private WideOrderESDao wideOrderDao;

@Override

public void save(List wides){

wideOrderDao.saveAll(wides);

}

@Override

public List findByIds(List masterIds){

return Lists.newArrayList(wideOrderDao.findAllById(masterIds));

}

@Override

public void consumeByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer){

switch (wideOrderType){

case PRODUCT:

this.wideOrderDao.findByProductId((Long) key).forEach(wideConsumer);

case ADDRESS:

this.wideOrderDao.findByAddressId((Long) key).forEach(wideConsumer);

case ORDER:

this.wideOrderDao.findById((Long) key).ifPresent(wideConsumer);

case USER:

this.wideOrderDao.findByUserId((Long) key).forEach(wideConsumer);

}

}

@Override

public boolean supportUpdateFor(WideOrderType wideOrderType){

return false;

}

@Override

public void updateByItem(WideOrderType wideOrderType, KEY key, Consumer wideConsumer){

ConsumerupdateAndSave = wideConsumer.andThen(wideOrder -> wideOrderDao.save(wideOrder));

switch (wideOrderType){

case PRODUCT:

this.wideOrderDao.findByProductId((Long) key).forEach(updateAndSave);

case ADDRESS:

this.wideOrderDao.findByAddressId((Long) key).forEach(updateAndSave);

case ORDER:

this.wideOrderDao.findById((Long) key).ifPresent(updateAndSave);

case USER:

this.wideOrderDao.findByUserId((Long) key).forEach(updateAndSave);

}

}

@Override

public void updateByItem(WideOrderType wideOrderType, KEY key, WideItemData item){

}

}

仓库具有如下特征:

实现 WideCommandRepository<Long, WideOrderType, WideOrder> 接口,其中 Long 是模型主键(也是宽表主键),WideOrderType 是之前定义的枚举,WideOrder 是宽表;void save(List<WideOrder> wides) 提供批量保存方法;List<WideOrder> findByIds(List<Long> masterIds) 提供根据主键批量查询方法;void consumeByItem(WideOrderType wideOrderType, KEY key, Consumer<WideOrder> wideConsumer),该方法主要用于数据巡检,根据类型 和 键信息 从底层引擎中获取数据,并进行部分比对,用于发现数据不一致情况;boolean supportUpdateFor(WideOrderType wideOrderType),该实现用于判断是否支持特定类型的批量更新,及依赖引擎能力批量对数据进行更新操作;void updateByItem(WideOrderType wideOrderType, KEY key, WideItemData<WideOrderType, ?> item),supportUpdateFor 返回为 true 时,调用该方法,使用引擎的更新能力批量对数据进行更新;void updateByItem(WideOrderType wideOrderType, KEY key, Consumer<WideOrder> wideConsumer),supportUpdateFor 返回为 false 时,调用该方法,根据 类型 和 键信息 依次查询所有数据,在内存中完成更新,并写回存储引擎;

所依赖的 WideOrderESDao 基于 ElasticsearchRepository 实现,具体如下:

public interface WideOrderESDao extends ElasticsearchRepository {

List findByProductId(Long productId);

List findByAddressId(Long addressId);

List findByUserId(Long userId);

}2.5. 配置&整合

所有组件都已准备好,现在需要将他们整合在一起。

@Configuration

public class WideOrderConfiguration extends WideConfigurationSupport {

@Autowired

private WideOrderRepository wideOrderRepository;

@Autowired

private List

@Bean

public WideIndexService createWideIndexService(){

return super.createWideIndexService();

}

@Bean

public WideOrderPatrolService wideOrderPatrolService(){

return new WideOrderPatrolService(createWidePatrolService());

}

@Bean

protected WideService createWideService(

WideIndexService wideIndexService,

WideOrderPatrolService wideOrderPatrolService){

return super.createWideService(wideIndexService, wideOrderPatrolService);

}

@Override

protected WideFactory getWideFactory() {

return WideOrder::new;

}

@Override

protected WideCommandRepository getWideCommandRepository() {

return this.wideOrderRepository;

}

@Override

protected List

return this.wideItemDataProviders;

}

}

WideOrderConfiguration 具有如下特点:

继承自 WideConfigurationSupport<Long, WideOrderType, WideOrder>,父类中存在大量的 createXXX 方法,可以大幅简单代码量;使用 WideOrderRepository 作为宽表的仓库;直接使用 Spring 容器中的所有 WideItemDataProvider 实现;使用定制的 WideOrderPatrolService,为巡检增加延时支持;

其中自定义巡检 WideOrderPatrolService 代码如下:

public class WideOrderPatrolService implements WidePatrolService{

private final WidePatrolService widePatrolService;

public WideOrderPatrolService(WidePatrolService widePatrolService){

this.widePatrolService = widePatrolService;

}

@Override

@DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "SingleIndex", consumerGroup = "order_patrol_group", delayLevel = 2)

public void index(Long aLong){

this.widePatrolService.index(aLong);

}

@Override

public void index(List longs){

WideOrderPatrolService wideOrderPatrolService = ((WideOrderPatrolService)AopContext.currentProxy());

longs.forEach(id -> wideOrderPatrolService.index(id));

}

@Override

public void updateItem(WideOrderType wideOrderType, KEY key){

((WideOrderPatrolService)AopContext.currentProxy()).updateItem(wideOrderType, (Long) key);

}

@DelayBasedRocketMQ(topic = "wide_order_patrol", tag = "UpdateByItem", consumerGroup = "order_patrol_group", delayLevel = 2)

public void updateItem(WideOrderType wideOrderType, Long id){

this.widePatrolService.updateItem(wideOrderType, id);

}

@Override

public void setReindexConsumer(Consumer

this.widePatrolService.setReindexConsumer(consumer);

}

}

WideOrderPatrolService 具体实现如下:

将大部分请求直接转发给内部的 widePatrolService 实例;在索引和更新方法上增加了 @DelayBasedRocketMQ 注解,该注解使的方法拥有延时执行的能力,如果对该注解感兴趣可以翻找下之前的文章;使用 AopContext 在类内获取 Proxy 对象并调用其方法,由于 AOP 实现的限制,在类中直接调用本类中的其他方法,不会触发拦截器;2.6. 实现效果

万事具备只欠东风,写个测试用例测试下功能。

2.6.1. 数据索引

首先,对数据进行索引,示例如下:

// 保存 User

this.user = new User();

this.user.setName("测试");

this.userDao.save(this.user);

// 保存 Address

this.address = new Address();

this.address.setDetail("详细地址");

this.address.setUserId(this.user.getId());

this.addressDao.save(this.address);

// 保存 Product

this.product = new Product();

this.product.setName("商品名称");

this.product.setPrice(100);

this.productDao.save(this.product);

// 保存 Order

this.order = new Order();

this.order.setUserId(this.user.getId());

this.order.setAddressId(this.address.getId());

this.order.setProductId(this.product.getId());

this.order.setDescr("我的订单");

this.orderDao.save(this.order);

// 进行索引

this.wideOrderService.index(this.order.getId());

// 比对数据

Optional optional = wideOrderDao.findById(this.order.getId());

Assertions.assertTrue(optional.isPresent());

WideOrder wideOrder = optional.get();

Assertions.assertEquals(order.getId(), wideOrder.getId());

Assertions.assertEquals(order.getAddressId(), wideOrder.getAddressId());

Assertions.assertEquals(order.getProductId(), wideOrder.getProductId());

Assertions.assertEquals(order.getUserId(), wideOrder.getUserId());

Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr());

Assertions.assertEquals(user.getName(), wideOrder.getUserName());

Assertions.assertEquals(address.getDetail(), wideOrder.getAddressDetail());

Assertions.assertEquals(product.getName(), wideOrder.getProductName());

Assertions.assertEquals(product.getPrice(), wideOrder.getProductPrice());

单测成功运行后,数据已经成功写入到 ES,具体如下:

2.6.2. 数据更新

更新操作,具体单测如下:

// 更新订单描述

this.order.setDescr("订单详情");

this.orderDao.save(this.order);

// 触发索引更新

this.wideOrderService.updateOrder(this.order.getId());

// 验证更新结果

Optionaloptional = wideOrderDao.findById(this.order.getId());

Assertions.assertTrue(optional.isPresent());

WideOrder wideOrder = optional.get();

Assertions.assertEquals(order.getId(), wideOrder.getId());

Assertions.assertEquals(order.getDescr(), wideOrder.getOrderDescr());

单测成功运行后,数据已经完成更新,ES 数据具体如下:

2.6.3. 数据巡检

仔细观察日志,会发现存在一组 Delay Task 日志,具体如下:

[ main] c.g.l.core.delay.DelayMethodInterceptor : success to sent Delay Task to RocketMQ for [126]

[MessageThread_2] c.g.l.c.w.s.SimpleWidePatrolService : id 126 is same

第一条日志是在提交索引时由主线程打印,向 RocketMQ 提交一个延时任务,用于对 id 为 126 的数据进行校验;

第二条是时间达到后由 Message Consumer 线程打印,表明 DB 与 ES 中的数据是相同的;

如果巡检时发现数据不同,将会自动对 126 进行索引,从而保障两者的一致性;

3. 设计&扩展

3.1. 核心设计

整体架构设计如下:

从功能角度,整体可分为如下几部分:

Index 索引部分。内部可以看成是一个基于 检索模型 的 Pipeline,从众多数据提供器中获取数据,并写入 检索模型,最终将填充完数据的检索模型写入的 ES 进行持久化存储;Query 查询部分。直接使用 ES 的 api 对成功索引的数据进行查询。巡检部分。在数据变更时,会自动增加一个延时任务用于数据比较,巡检任务获取变更数据后与ES存储记录进行比较,如果数据不一致则向 Index 模块重新提交索引任务,对问题数据进行再次索引,从而对数据进行恢复;

3.2. 功能扩展

wide 为宽表提供了索引和巡检能力支持,但在实际业务中需要处理多种情况,常见如下:

自动触发,这是系统核心流程之一,数据发生变化后,向 Index 提交新的索引任务。常见的实现策略有:

基于领域事件的索引。监听应用程序发出的领域事件,从而触发新数据的索引;

基于 binlog 的索引。MySQL 的变化全部记录在 binlog 中,可以通过 canal 等框架将 binlog 进行导出,用于触发数据索引;

手工回溯,手工触发索引流程,常见的场景有:

由于业务需要 ES 检索模型发生变更,需要重跑历史数据;

系统故障导致数据不一致,通过手工触发的方式对问题数据进行修复;

天级数据重建。每天凌晨对前一天的数据进行索引重建,主要目的为:

避免错误在 ES 进行累计,也就是在索引和巡检两个机制都不生效的情况下,对问题数据进行修复;

索引优化,在数据完成重建后,可以调用 ES 索引优化接口,对索引进行合并,从而提升系统查询性能;

4. 项目信息

项目仓库地址:https://gitee.com/litao851025/lego

项目文档地址:https://gitee.com/litao851025/lego/wikis/support/Wide%20%E5%AE%BD%E8%A1%A8

二维码

扫一扫,关注我们

声明:本文由【益华网络】编辑上传发布,转载此文章须经作者同意,并请附上出处【益华网络】及本页链接。如内容、图片有任何版权问题,请联系我们进行处理。

感兴趣吗?

欢迎联系我们,我们愿意为您解答任何有关网站疑难问题!

您身边的【网站建设专家】

搜索千万次不如咨询1次

主营项目:网站建设,手机网站,响应式网站,SEO优化,小程序开发,公众号系统,软件开发等

立即咨询 15368564009
在线客服
嘿,我来帮您!