大数据计算

伴鱼:借助 Flink 完成机器学习特征系统的升级

2021-09-22  本文已影响0人  Flink中文社区

本文作者陈易生,介绍了伴鱼平台机器学习特征系统的升级,在架构上,从 Spark 转为 Flink,解决了特征上线难的问题,以及 SQL + Python UDF 如何用于生产实践。 主要内容为:

  1. 前言
  2. 老版特征系统 V1
  3. 新版特征系统 V2
  4. 总结
img

一、前言

在伴鱼,我们在多个在线场景使用机器学习提高用户的使用体验,例如:在伴鱼绘本中,我们根据用户的帖子浏览记录,为用户推荐他们感兴趣的帖子;在转化后台里,我们根据用户的绘本购买记录,为用户推荐他们可能感兴趣的课程等。

特征是机器学习模型的输入。如何高效地将特征从数据源加工出来,让它能够被在线服务高效地访问,决定了我们能否在生产环境可靠地使用机器学习。为此,我们搭建了特征系统,系统性地解决这一问题。目前,伴鱼的机器学习特征系统运行了接近 100 个特征,支持了多个业务线的模型对在线获取特征的需求。

下面,我们将介绍特征系统在伴鱼的演进过程,以及其中的权衡考量。

二、旧版特征系统 V1

特征系统 V1 由三个核心组件构成:特征管道,特征仓库,和特征服务。整体架构如下图所示:

v1 architecture

特征管道包括流特征管道批特征管道,它们分别消费流数据源和批数据源,对数据经过预处理加工成特征 (这一步称为特征工程),并将特征写入特征仓库。

特征仓库选用合适的存储组件 (Redis) 和数据结构 (Hashes),为模型服务提供低延迟的特征访问能力。之所以选用 Redis 作为存储,是因为:

特征服务屏蔽特征仓库的存储和数据结构,对外暴露 RPC 接口 GetFeatures(EntityName, FeatureNames),提供对特征的低延迟点查询。在实现上,这一接口基本对应于 Redis 的 HMGET EntityName FeatureName_1 ... FeatureName_N 操作。

这一版本的特征系统存在几个问题:

为了解决这几个问题,特征系统 V2 提出几个设计目的:

三、新版特征系统 V2

特征系统 V2 相比特征系统 V1 在架构上的唯一不同点在于,它将特征管道切分为三部分:特征生成管道,特征源,和特征注入管道。值得一提的是,管道在实现上均从 Spark 转为 Flink,和公司数据基础架构的发展保持一致。特征系统 V2 的整体架构如下图所示:

v2 architecture

1. 特征生成管道

特征生成管道读取原始数据源,加工为特征,并将特征写入指定特征源 (而非特征仓库)。

特征生成管道的逻辑由算法工程师全权负责编写。其中,批特征生成管道使用 HiveQL 编写,由 DolphinScheduler 调度。流特征生成管道使用 PyFlink 实现,详情见下图:

v2 codegen

算法工程师需要遵守下面步骤:

  1. 用 Flink SQL 声明 Flink 任务源 (source.sql) 和定义特征工程逻辑 (transform.sql);
  2. (可选) 用 Python 实现特征工程逻辑中可能包含的 UDF 实现 (udf_def.py);
  3. 使用自研的代码生成工具,生成可执行的 PyFlink 任务脚本 (run.py);
  4. 本地使用由平台准备好的 Docker 环境调试 PyFlink 脚本,确保能在本地正常运行;
  5. 把代码提交到一个统一管理特征管道的代码仓库,由 AI 平台团队进行代码审核。审核通过的脚本会被部署到伴鱼实时计算平台,完成特征生成管道的上线。

这一套流程确保了:

2. 特征源

特征源存储从原始数据源加工形成的特征。值得强调的是,它同时还是连接算法工程师和 AI 平台工程师的桥梁。算法工程师只负责实现特征工程的逻辑,将原始数据加工为特征,写入特征源,剩下的事情就交给 AI 平台。平台工程师实现特征注入管道,将特征写入特征仓库,以特征服务的形式对外提供数据访问服务。

3. 特征注入管道

特征注入管道将特征从特征源读出,写入特征仓库。由于 Flink 社区缺少对 Redis sink 的原生支持,我们通过拓展 RichSinkFunction [3] 简单地实现了 StreamRedisSinkBatchRedisSink,很好地满足我们的需求。

其中,BatchRedisSink 通过 Flink Operator State [4] 和 Redis Pipelining [5] 的简单结合,大量参考 Flink 文档中的 BufferingSink,实现了批量写入,大幅减少对 Redis Server 的请求量,增大吞吐,写入效率相比逐条插入提升了 7 倍 [6]。BatchRedisSink 的简要实现如下。其中,flush 实现了批量写入 Redis 的核心逻辑,checkpointedState / bufferedElements / snapshotState / initializeState 实现了使用 Flink 有状态算子管理元素缓存的逻辑。

class BatchRedisSink(
    pipelineBatchSize: Int
) extends RichSinkFunction[(String, Timestamp, Map[String, String])]
    with CheckpointedFunction {

  @transient
  private var checkpointedState
      : ListState[(String, java.util.Map[String, String])] = _

  private val bufferedElements
      : ListBuffer[(String, java.util.Map[String, String])] =
    ListBuffer.empty[(String, java.util.Map[String, String])]

  private var jedisPool: JedisPool = _

  override def invoke(
      value: (String, Timestamp, Map[String, String]),
      context: SinkFunction.Context
  ): Unit = {
    import scala.collection.JavaConverters._

    val (key, _, featureKVs) = value
    bufferedElements += (key -> featureKVs.asJava)

    if (bufferedElements.size == pipelineBatchSize) {
      flush()
    }
  }

  private def flush(): Unit = {
    var jedis: Jedis = null
    try {
      jedis = jedisPool.getResource
      val pipeline = jedis.pipelined()
      for ((key, hash) <- bufferedElements) {
        pipeline.hmset(key, hash)
      }
      pipeline.sync()
    } catch { ... } finally { ... }
    bufferedElements.clear()
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor =
      new ListStateDescriptor[(String, java.util.Map[String, String])](
        "buffered-elements",
        TypeInformation.of(
          new TypeHint[(String, java.util.Map[String, String])]() {}
        )
      )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    import scala.collection.JavaConverters._

    if (context.isRestored) {
      for (element <- checkpointedState.get().asScala) {
        bufferedElements += element
      }
    }
  }

  override def open(parameters: Configuration): Unit = {
    try {
      jedisPool = new JedisPool(...)
    } catch { ... }
  }

  override def close(): Unit = {
    flush()
    if (jedisPool != null) {
      jedisPool.close()
    }
  }
}

特征系统 V2 很好地满足了我们提出的设计目的。

四、总结

特征系统 V1 解决了特征上线的问题,而特征系统 V2 在此基础上,解决了特征上线难的问题。在特征系统的演进过程中,我们总结出作为平台研发的几点经验:

Reference

[1] https://doordash.engineering/2020/11/19/building-a-gigascale-ml-feature-store-with-redis/

[2] https://docs.feast.dev/feast-on-kubernetes/concepts/stores#online-store

[3] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java

[4] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#using-operator-state

[5] https://redis.io/topics/pipelining

[6] https://site-git-update-feature-system-yishengdd.vercel.app/posts/flink-bulk-insert-redis

[7] https://eng.uber.com/scaling-michelangelo/

上一篇下一篇

猜你喜欢

热点阅读