因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:
- val env = ExecutionEnvironment.getExecutionEnvironment
- val textDataSet: DataSet[String] = env.fromCollection(
- List("张三,1", "李四,2", "王五,3", "张三,4")
- )
1. map
将DataSet中的每一个元素转换为另外一个元素:
- // 使用map将List转换为一个Scala的样例类
- case class User(name: String, id: String)
- val userDataSet: DataSet[User] = textDataSet.map {
- text =>
- val fieldArr = text.split(",")
- User(fieldArr(0), fieldArr(1))
- }
- userDataSet.print()
2. flatMap
将DataSet中的每一个元素转换为0…n个元素:
- // 使用flatMap操作,将集合中的数据:
- // 根据第一个元素,进行分组
- // 根据第二个元素,进行聚合求值
- val result = textDataSet.flatMap(line => line)
- .groupBy(0) // 根据第一个元素,进行分组
- .sum(1) // 根据第二个元素,进行聚合求值
- result.print()
3. mapPartition
将一个分区中的元素转换为另一个元素:
- // 使用mapPartition操作,将List转换为一个scala的样例类
- case class User(name: String, id: String)
- val result: DataSet[User] = textDataSet.mapPartition(line => {
- line.map(index => User(index._1, index._2))
- })
- result.print()
4. filter
过滤出来一些符合条件的元素,返回boolean值为true的元素:
- val source: DataSet[String] = env.fromElements("java", "scala", "java")
- val filter:DataSet[String] = source.filter(line => line.contains("java"))//过滤出带java的数据
- filter.print()
5. reduce
可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素:
- // 使用 fromElements 构建数据源
- val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
- // 使用map转换成DataSet元组
- val mapData: DataSet[(String, Int)] = source.map(line => line)
- // 根据首个元素分组
- val groupData = mapData.groupBy(_._1)
- // 使用reduce聚合
- val reduceData = groupData.reduce((x, y) => (x._1, x._2 + y._2))
- // 打印测试
- reduceData.print()
6. reduceGroup
将一个dataset或者一个group聚合成一个或多个元素。
- // 使用 fromElements 构建数据源
- val source: DataSet[(String, Int)] = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
- // 根据首个元素分组
- val groupData = source.groupBy(_._1)
- // 使用reduceGroup聚合
- val result: DataSet[(String, Int)] = groupData.reduceGroup {
- (in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>
- val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))
- out.collect(tuple)
- }
- // 打印测试
- result.print()
7. minBy和maxBy
选择具有最小值或最大值的元素:
- // 使用minBy操作,求List中每个人的最小值
- // List("张三,1", "李四,2", "王五,3", "张三,4")
- case class User(name: String, id: String)
- // 将List转换为一个scala的样例类
- val text: DataSet[User] = textDataSet.mapPartition(line => {
- line.map(index => User(index._1, index._2))
- })
- val result = text
- .groupBy(0) // 按照姓名分组
- .minBy(1) // 每个人的最小值
8. Aggregate
在数据集上进行聚合求最值(最大值、最小值):
- val data = new mutable.MutableList[(Int, String, Double)]
- data.+=((1, "yuwen", 89.0))
- data.+=((2, "shuxue", 92.2))
- data.+=((3, "yuwen", 89.99))
- // 使用 fromElements 构建数据源
- val input: DataSet[(Int, String, Double)] = env.fromCollection(data)
- // 使用group执行分组操作
- val value = input.groupBy(1)
- // 使用aggregate求最大值元素
- .aggregate(Aggregations.MAX, 2)
- // 打印测试
- value.print()
Aggregate只能作用于元组上
注意:
要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0) ,否则会报一下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.