首先,我会介绍一下AI应用中数据集成的典型场景,ETL和ELT两种数据集成模式的异同点,以及为什么AI应用下更适合采用ELT模式。然后,我会花一些篇幅介绍数据集成中需要重点考虑的基本问题,以及我们所采用的底层平台——KafkaConnect在解决这些问题上的优势和局限。
接下来,我会介绍DataPipeline对于KafkaConnect一些优化。有的是从底层做的优化,例如线程池的优化。有的则是从产品特性上的优化,例如错误数据队列。
最后,我们谈一谈KafkaConnect和KafkaStream的结合,以及我们用KafkaStream做数据质量预警方面的一个应用Case。
一、AI应用场景下的数据集成
数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,为企业提供全面的数据共享。AI是典型的数据驱动应用,数据集成在其中起着关键的基础性作用。
以一个大家所熟悉的在线推荐服务为例,通常需要依赖三类数据:用户的属性(年龄、性别、地域、注册时间等)、商品的属性(分类、价格、描述等)、用户产生的各类行为(登录、点击、搜索、加购物车、购买、评论、点赞、收藏、加好友、发私信等)事件数据。
随着微服务框架的流行,这三类数据通常会存在于不同的微服务中:“用户管理服务”储存着用户的属性、好友关系、登录等数据;“商品管理服务”存储的商品信息;“订单服务”存储着用户的订单数据;“支付服务”存储用户的支付数据;“评论服务”记录着用户的评论和点赞数据。为了实现一个推荐服务,我们首先需要让服务能够访问到这些数据。这种数据访问应该是非侵入式的,也就是说不能对原有系统的性能、稳定性、安全性造成额外的负担。因此,推荐服务不应当直接访问这些分散的数据源,而是应该通过某种方式将这些数据从各个业务子系统中提取出来,汇集到一个逻辑上集中的数据库/仓库,然后才能方便地使用机器学习框架(例如SparkMLlib)来读取数据、训练和更新模型。
ETL和ELT的区别与联系
数据集成包含三个基本的环节:Extract(抽取)、Transform(转换)、Load(加载)。
抽取是将数据从已有的数据源中提取出来,例如通过JDBC/Binlog方式获取MySQL数据库的增量数据;转换是对原始数据进行处理,例如将用户属性中的手机号替换为匿名的唯一ID、计算每个用户对商品的平均打分、计算每个商品的购买数量、将B表的数据填充到A表中形成新的宽表等;加载是将数据写入目的地。
根据转换转换发生的顺序和位置,数据集成可以分为ETL和ELT两种模式。ETL在数据源抽取后首先进行转换,然后将转换的结果写入目的地。ELT则是在抽取后将结果先写入目的地,然后由下游应用利用数据库的聚合分析能力或者外部计算框架,例如Spark来完成转换的步骤。
为什么ELT更适合AI应用场景
为什么说ELT更适合AI的应用场景呢?
首先这是由AI应用对数据转换的高度灵活性需求决定的。绝大多数AI应用使用的算法模型都包括一个特征提取和变换的过程。根据算法的不同,这个特征提取可能是特征矩阵的简单的归一化或平滑处理,也可以是用Aggregation函数或One-Hot编码进行维度特征的扩充,甚至特征提取本身也需要用到其它模型的输出结果。这使得AI模型很难直接利用ETL工具内建的转换功能,来完成特征提取步骤。此外,企业现在很少会从零构建AI应用。当应用包括Spark/FlinkMLlib在内的机器学习框架时,内建的模型库本身往往包含了特征提取和变换的逻辑,这使得在数据提取阶段就做复杂变换的必要性进一步降低;
其次,企业经常会基于同样的数据构建不同应用。以我之前所在的一家在线教育公司为例,我们构建了两个AI的应用:其中一个是针对各类课程的推荐应用,主要用于增加用户的购买转化率。另外一个是自适应学习系统,用于评估用户的知识掌握程度和题目的难度和区分度,从而为用户动态地规划学习路径。两个应用都需要用户属性、做题记录、点击行为以及学习资料文本,但采用的具体模型的特征提取和处理方式完全不同。如果用ETL模式,我们需要从源端抽取两遍数据。而采用ELT模式,所有数据存储在HBase中,不同的应用根据模型需要过滤提取出所需的数据子集,在Spark集群完成相应的特征提取和模型计算,降低了对源端的依赖和访问频次;