canal同步数据索引商品数据到ES流程

2020-07-23  本文已影响0人  影子猪_
需求分析

商品上架后,将sku列表导入索引库

实现思路

1.tb_spu列表发生改变,上架标记位由0 → 1 ,拿到该条数据的spuId并将该消息发送到rabbitmq。
2.由于商品上架后会有多个操作逻辑比如商品信息页面静态化等操作,所以商品上架交换机使用广播模式fanout,建立导入索引库的队列,并建立交换机与队列之间的绑定关系。
3.搜索微服务作为rabbitmq的消费者端,通过feign远程调用商品微服务,根据spuId拿到sku列表数据后,通过ElasticSearch的API导入索引库。

具体实现过程

1.在canal监听微服务中:配置一个tb_spu表的监听类


@CanalEventListener//声明一个监听类
public class SpuListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ListenPoint(schema = "changgou_business",table = "tb_spu",eventType = CanalEntry.EventType.UPDATE)
    public void goods_up(CanalEntry.EventType eventType,CanalEntry.RowData rowData){

    //上架标记位由0 → 1 ,拿到该条数据的spuId并将该消息发送到rabbitmq
    此处代码略......
  }

使用rabbitmq发送消息,最好新建一个rabbitmqConfig,用于配置交换机,队列以及绑定关系。

@Configuration
public  class RabbitmqConfig {

    //定义交换机名
    public static final String GOODS_UP_EXCHANGE = "goods_up_exchange";

    //定义队列名
    public static final String AD_UPDATE_QUEUE = "ad_update_queue";
    public static final String SEARCH_ADD_QUEUE = "search_add_queue";

    @Bean
    public Queue Queue(){
        return new Queue(AD_UPDATE_QUEUE);
    }


    //声明队列
    @Bean(SEARCH_ADD_QUEUE)
    public Queue SEARCH_ADD_QUEUE(){
        return new Queue(SEARCH_ADD_QUEUE);
    }


    //声明交换机
    @Bean(GOODS_UP_EXCHANGE)
    public Exchange GOODS_UP_EXCHANGE(){
        return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
    }


    //队列和交换机绑定
    @Bean
    public Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue,@Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("").noargs();
    }



}

2.新建一个搜索微服务,作为消息队列的消费者端。通过接受到的消息数据spuId,用Feign调用goods微服务中的根据spuId查询sku列表方法,拿到数据。

feign声明式调用:在goods_api微服务中声明一个skuFeign接口,并添加通过spuId查询sku列表的抽象方法。该接口供搜索服务中使用。


@FeignClient(name = "goods")
@RequestMapping("/sku")
public interface SkuFeign {


    @GetMapping("/spu/{spuId}")
    List<Sku> findListBySpuId(@PathVariable("spuId") String spuId);


}

3.通过feign远程调用商品微服务,根据spuId拿到sku列表数据后,通过ElasticSearch的API导入索引库。
当搜索服务中,用到skuFeign.findListBySpuId()方法时,会调用goods服务里的方法( " /goods/sku/spu/{spuId} " )

业务流程图
image.png
上一篇下一篇

猜你喜欢

热点阅读