Flink数据转换的基本算子

Posted by JustDoDT on August 27, 2019

概述

flink

在 Flink 应用程序中,无论你的应用程序时批处理,还是流处理,都是上图这种模型,有数据源,有数据落地到啥地方(Sink),类似于Apache Flume;我们写的应用程序多是对数据源过来的数据做一系列操作,总结如下。

  • Source:数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source 、基于文件的 source、基于网络套接字的 source 、自定义的 source 。自定义的 source 常见的有 Apache Kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
  • Transformation: 数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。
  • Sink: 接收器,Sink 是指 Flink 将转换计算后的数据发送的地点,你可能需要存储下来。Flink 常见的 Sink 大概有如下几类:写入文件,打印文件、写入 Socket、自定义的 Sink。自定义的 sink 常见的有 Apache Kafka、RabbitMQ、MySQL、Elasticsearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以自定义 Sink。

本文将介绍 Flink 中批处理、流处理常用的算子。

DataStream Operator

Map

  • Map 算子的输入流是 DataStream,经过 Map 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成一个元素,举个例子:
 SingleOutputStreamOperator<Employee> map = employeeStream.map(new
 MapFunction<Employee, Employee>() {
 @Override
 public Employee map(Employee employee) throws Exception {
 employee.salary = employee.salary + 5000;
 return employee;
  }
 });
 map.print();
 

###

FlatMap

  • FlatMap 算子的输入流是 DataStream,经过 FlatMap 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成零个、一个元素或多个元素,举个例子:
 SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new
 FlatMapFunction<Employee, Employee>() {
 @Override
 public void flatMap(Employee employee, Collector<Employee> out) throws
 Exception{
     if(employee.salary>=2000)
         out.collect(employee);
 }
 	}
   });                                                                    
 flatMap.print();
                                                                      

Filter

  • 过滤操作
SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new
FlatMapFunction<Employee, Employee>() {
@Override
public void flatMap(Employee employee, Collector<Employee> out) throws
Exception{
   if(employee.salary >= 2000)
      return true;
}
return false;
	}
 });                                                                    
filter.print();

KeyBy

flink

  • KeyBy 在逻辑上是基于 key 对流进行分区的,相同的 key 会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。举例:
 KeyedStream<ProductEvent,Integer>keyBy=productStream.keyBy(new KeySelector<ProductEvent,Integer>(){
     @Override
     publicIntegergetKey(ProductEventproduct)throwsException{
         returnproduct.shopId;
     }
    });                                                                    
 keyBy.print();   

Reduce

  • Reduce返回单个的结果值,并且reduce操作每处理一个元素总是创建一个新值。常用的方法有

average、sum、min、max、count,使用Reduce方法都可实现。

 SingleOutputStreamOperator<Employee> reduce = employeeStream.keyBy(new KeySelector<Employee, Integer>() {
 	@Override
 	public Integer getKey(Employee employee) throws Exception {
 return employee.shopId; 5	}
 }).reduce(new ReduceFunction<Employee>() {
 	@Override
 	public Employee reduce(Employee employee1, Employee employee2) throws Exception {
 	employee1.salary = (employee1.salary + employee2.salary) / 2;
 	return employee1; 11	}
 	});
 	reduce.print();
  • `上面先将数据流进行 keyby 操作,因为执行 Reduce 操作只能是 KeyedStream,然后将员工的工资做

了一个求平均值的操作。

Aggregation

  • DataStreamAPI 支持各种聚合,例如min、max、sum等。这些函数可以应用于 KeyedStream 以获

得 Aggregations 聚合。

KeyedStream.sum(0)
KeyedStream.sum("key")
   
KeyedStream.min(0)
KeyedStream.min("key")
   
KeyedStream.max(0)
KeyedStream.max("key")
   
KeyedStream.minBy(0)    
   
KeyedStream.minBy("key")
KeyedStream.maxBy(0)

KeyedStream.maxBy("key")
    

max和 maxBy 之间的区别在于 max 返回流中的最⼤值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。

Window

  • Window 函数允许按时间或其他条件对现有 KeyedStream 进⾏行分组。 以下是以 10 秒的时间窗⼝聚合:

     inputStream.keyBy(0).window(Time.seconds(10));
    
  • 有时候因为业务需求场景要求:聚合⼀分钟、⼀小时的数据做统计报表使用。

WindowAll

  • WindowAll 将元素按照某种特性聚集在⼀起,该函数不支持并行行操作,默认的并行度就是 1,所以如果使⽤用这个算子的话需要注意⼀下性能问题,以下是使用例子:
inputStream.keyBy(0).windowAll(Time.seconds(10));

Union

flink

  • Union 函数将两个或多个数据流结合在一起。 这样后面在使用的时候就只需使⽤用一个数据流就行了。如果我们将一个流与自身组合,那么组合后的数据流会有两份同样的数据。
inputStream.union(inputStream1, inputStream2, ...);

Window Join

  • 我们可以通过一些 key 将同一个 window 的两个数据流 join 起来。
inputStream.join(inputStream1)
	.where(0).equalTo(1)
	.window(Time.seconds(5))
	.apply (new JoinFunction () {...});
  • 以上示例是在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。

Split

flink

  • 此功能根据条件将流拆分为两个或多个流。 当你获得混合流然后你可能希望单独处理每个数据流时, 可以使用此方法。
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer> () {
	@Override
	public Iterable<String> select(Integer value) {
		List<String> output = new ArrayList<String>(); 
       if (value % 2 == 0) {
			output.add("even");
		} else {
		output.add("odd"); 9	
		}
		return output; 	
		}
	});
  • 上面就是将偶数数据流放在 even 中,将数数据流放在 odd 中。

Select

flink

  • 上面用 Split 算子将数据流拆分为 3 个数据流,然后用 Select 算子选出特定的流。(一般这两者是搭配使用的)

其他算子详见:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/

DataSet Operator

上面介绍了 DataStream 的常用算子,其实上面也有一些算子也是同样适合于 DataSet 的,比如 Map、FlatMap、Filter 等(这个和 DataStream差不多,其实也和 Spark 中的算子类似);其中也有一些算子是 DataSet API 独有的,比如 DataStream 中分区使用的是 KeyBy ,但是 DataSet 使用的是 GroupBy

First-n

  • 返回数据集的前n个(任意)元素。 First-n可以应用于常规数据集,分组数据集或分组排序数据集。 分组键可以指定为键选择器功能,元组位置或案例类字段。
val in: DataSet[(Int, String)] = // [...]
// regular data set
val result1 = in.first(3)
// grouped data set
val result2 = in.groupBy(0).first(3)
// grouped-sorted data set
val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)

其他算子详见官方文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/

总结

一般公司里的业务场景需求肯定不止是只有批计算,也不只是流计算的。一般这种需求是都存在的。比如每天凌晨 0 点去跑昨天一天商品的售卖情况,然后出报表给运营或者老板去分析;另外的就是流处理。为了考虑到开发的成本以及开发人员的学习难度,后来推出了批流统一的 Flink Table/SQL API,在写这篇文章的时候 Flink 1.9版本正好发布,Blink 的代码已经在 Flink 1.9 的时候整合进去了。

本文介绍了 Flink 中的 DataStream 和 DataSet API 中常用的算子(Operator),其实我们在 Flink 中计算都是依靠这些算子,现在有 批流统一的 Flink Table/SQL API 。

参考文档