硬核!聊一聊Flink流计算常用算子

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:


  1. val env = ExecutionEnvironment.getExecutionEnvironment 
  2. val textDataSet: DataSet[String] = env.fromCollection( 
  3.   List("张三,1""李四,2""王五,3""张三,4"

1. map

将DataSet中的每一个元素转换为另外一个元素:


  1. // 使用map将List转换为一个Scala的样例类 
  2.  
  3. case class User(name: String, id: String) 
  4.  
  5. val userDataSet: DataSet[User] = textDataSet.map { 
  6.   text => 
  7.     val fieldArr = text.split(","
  8.     User(fieldArr(0), fieldArr(1)) 
  9. userDataSet.print() 

2. flatMap

将DataSet中的每一个元素转换为0…n个元素:


  1. // 使用flatMap操作,将集合中的数据: 
  2. // 根据第一个元素,进行分组 
  3. // 根据第二个元素,进行聚合求值  
  4.  
  5. val result = textDataSet.flatMap(line => line) 
  6.       .groupBy(0) // 根据第一个元素,进行分组 
  7.       .sum(1) // 根据第二个元素,进行聚合求值 
  8.        
  9. result.print() 

3. mapPartition

将一个分区中的元素转换为另一个元素:


  1. // 使用mapPartition操作,将List转换为一个scala的样例类 
  2.  
  3. case class User(name: String, id: String) 
  4.  
  5. val result: DataSet[User] = textDataSet.mapPartition(line => { 
  6.       line.map(index => User(index._1, index._2)) 
  7.     }) 
  8.      
  9. result.print() 

4. filter

过滤出来一些符合条件的元素,返回boolean值为true的元素:


  1. val source: DataSet[String] = env.fromElements("java""scala""java"
  2. val filter:DataSet[String] = source.filter(line => line.contains("java"))//过滤出带java的数据 
  3. filter.print() 

5. reduce

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素:


  1. // 使用 fromElements 构建数据源 
  2. val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1)) 
  3. // 使用map转换成DataSet元组 
  4. val mapData: DataSet[(String, Int)] = source.map(line => line) 
  5. // 根据首个元素分组 
  6. val groupData = mapData.groupBy(_._1) 
  7. // 使用reduce聚合 
  8. val reduceData = groupData.reduce((x, y) => (x._1, x._2 + y._2)) 
  9. // 打印测试 
  10. reduceData.print() 

6. reduceGroup

将一个dataset或者一个group聚合成一个或多个元素。


  1. // 使用 fromElements 构建数据源 
  2. val source: DataSet[(String, Int)] = env.fromElements(("java", 1), ("scala", 1), ("java", 1)) 
  3. // 根据首个元素分组 
  4. val groupData = source.groupBy(_._1) 
  5. // 使用reduceGroup聚合 
  6. val result: DataSet[(String, Int)] = groupData.reduceGroup { 
  7.       (in: Iterator[(String, Int)], out: Collector[(String, Int)]) => 
  8.         val tuple = in.reduce((x, y) => (x._1, x._2 + y._2)) 
  9.         out.collect(tuple) 
  10.     } 
  11. // 打印测试 
  12. result.print() 

7. minBy和maxBy

选择具有最小值或最大值的元素:


  1. // 使用minBy操作,求List中每个人的最小值 
  2. // List("张三,1""李四,2""王五,3""张三,4"
  3.  
  4. case class User(name: String, id: String) 
  5. // 将List转换为一个scala的样例类 
  6. val text: DataSet[User] = textDataSet.mapPartition(line => { 
  7.       line.map(index => User(index._1, index._2)) 
  8.     }) 
  9.      
  10. val result = text 
  11.           .groupBy(0) // 按照姓名分组 
  12.           .minBy(1)   // 每个人的最小值 

8. Aggregate

在数据集上进行聚合求最值(最大值、最小值):


  1. val data = new mutable.MutableList[(Int, String, Double)] 
  2.     data.+=((1, "yuwen", 89.0)) 
  3.     data.+=((2, "shuxue", 92.2)) 
  4.     data.+=((3, "yuwen", 89.99)) 
  5. // 使用 fromElements 构建数据源 
  6. val input: DataSet[(Int, String, Double)] = env.fromCollection(data) 
  7. // 使用group执行分组操作 
  8. val value = input.groupBy(1) 
  9.             // 使用aggregate求最大值元素 
  10.             .aggregate(Aggregations.MAX, 2)  
  11. // 打印测试 
  12. value.print()       

Aggregate只能作用于元组上

注意:

要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0) ,否则会报一下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.

【声明】:芜湖站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

相关文章