概述
在 Flink 应用程序中,无论你的应用程序时批处理,还是流处理,都是上图这种模型,有数据源,有数据落地到啥地方(Sink),类似于Apache Flume;我们写的应用程序多是对数据源过来的数据做一系列操作,总结如下。
- Source:数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source 、基于文件的 source、基于网络套接字的 source 、自定义的 source 。自定义的 source 常见的有 Apache Kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source。
- Transformation: 数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据。
- Sink: 接收器,Sink 是指 Flink 将转换计算后的数据发送的地点,你可能需要存储下来。Flink 常见的 Sink 大概有如下几类:写入文件,打印文件、写入 Socket、自定义的 Sink。自定义的 sink 常见的有 Apache Kafka、RabbitMQ、MySQL、Elasticsearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以自定义 Sink。
本文将介绍 Flink 中批处理、流处理常用的算子。
DataStream Operator
Map
- Map 算子的输入流是 DataStream,经过 Map 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成一个元素,举个例子:
SingleOutputStreamOperator<Employee> map = employeeStream.map(new MapFunction<Employee, Employee>() { @Override public Employee map(Employee employee) throws Exception { employee.salary = employee.salary + 5000; return employee; } }); map.print();
###
FlatMap
- FlatMap 算子的输入流是 DataStream,经过 FlatMap 算子后返回的数据格式是 SingleOutputStreamOperator 类型,获取一个元素并生成零个、一个元素或多个元素,举个例子:
SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new FlatMapFunction<Employee, Employee>() { @Override public void flatMap(Employee employee, Collector<Employee> out) throws Exception{ if(employee.salary>=2000) out.collect(employee); } } }); flatMap.print();
Filter
- 过滤操作
SingleOutputStreamOperator<Employee> flatMap = employeeStream.flatMap(new FlatMapFunction<Employee, Employee>() { @Override public void flatMap(Employee employee, Collector<Employee> out) throws Exception{ if(employee.salary >= 2000) return true; } return false; } }); filter.print();
KeyBy
- KeyBy 在逻辑上是基于 key 对流进行分区的,相同的 key 会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。举例:
KeyedStream<ProductEvent,Integer>keyBy=productStream.keyBy(new KeySelector<ProductEvent,Integer>(){ @Override publicIntegergetKey(ProductEventproduct)throwsException{ returnproduct.shopId; } }); keyBy.print();
Reduce
- Reduce返回单个的结果值,并且reduce操作每处理一个元素总是创建一个新值。常用的方法有
average、sum、min、max、count,使用Reduce方法都可实现。
SingleOutputStreamOperator<Employee> reduce = employeeStream.keyBy(new KeySelector<Employee, Integer>() { @Override public Integer getKey(Employee employee) throws Exception { return employee.shopId; 5 } }).reduce(new ReduceFunction<Employee>() { @Override public Employee reduce(Employee employee1, Employee employee2) throws Exception { employee1.salary = (employee1.salary + employee2.salary) / 2; return employee1; 11 } }); reduce.print();
- `上面先将数据流进行 keyby 操作,因为执行 Reduce 操作只能是 KeyedStream,然后将员工的工资做
了一个求平均值的操作。
Aggregation
- DataStreamAPI 支持各种聚合,例如min、max、sum等。这些函数可以应用于 KeyedStream 以获
得 Aggregations 聚合。
KeyedStream.sum(0) KeyedStream.sum("key") KeyedStream.min(0) KeyedStream.min("key") KeyedStream.max(0) KeyedStream.max("key") KeyedStream.minBy(0) KeyedStream.minBy("key") KeyedStream.maxBy(0) KeyedStream.maxBy("key")
max和 maxBy 之间的区别在于 max 返回流中的最⼤值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。
Window
Window 函数允许按时间或其他条件对现有 KeyedStream 进⾏行分组。 以下是以 10 秒的时间窗⼝聚合:
inputStream.keyBy(0).window(Time.seconds(10));
有时候因为业务需求场景要求:聚合⼀分钟、⼀小时的数据做统计报表使用。
WindowAll
- WindowAll 将元素按照某种特性聚集在⼀起,该函数不支持并行行操作,默认的并行度就是 1,所以如果使⽤用这个算子的话需要注意⼀下性能问题,以下是使用例子:
inputStream.keyBy(0).windowAll(Time.seconds(10));
Union
- Union 函数将两个或多个数据流结合在一起。 这样后面在使用的时候就只需使⽤用一个数据流就行了。如果我们将一个流与自身组合,那么组合后的数据流会有两份同样的数据。
inputStream.union(inputStream1, inputStream2, ...);
Window Join
- 我们可以通过一些 key 将同一个 window 的两个数据流 join 起来。
inputStream.join(inputStream1) .where(0).equalTo(1) .window(Time.seconds(5)) .apply (new JoinFunction () {...});
- 以上示例是在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。
Split
- 此功能根据条件将流拆分为两个或多个流。 当你获得混合流然后你可能希望单独处理每个数据流时, 可以使用此方法。
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer> () { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); 9 } return output; } });
- 上面就是将偶数数据流放在 even 中,将数数据流放在 odd 中。
Select
- 上面用 Split 算子将数据流拆分为 3 个数据流,然后用 Select 算子选出特定的流。(一般这两者是搭配使用的)
其他算子详见:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/
DataSet Operator
上面介绍了 DataStream 的常用算子,其实上面也有一些算子也是同样适合于 DataSet 的,比如 Map、FlatMap、Filter 等(这个和 DataStream差不多,其实也和 Spark 中的算子类似);其中也有一些算子是 DataSet API 独有的,比如 DataStream
中分区使用的是 KeyBy
,但是 DataSet
使用的是 GroupBy
。
First-n
- 返回数据集的前n个(任意)元素。 First-n可以应用于常规数据集,分组数据集或分组排序数据集。 分组键可以指定为键选择器功能,元组位置或案例类字段。
val in: DataSet[(Int, String)] = // [...] // regular data set val result1 = in.first(3) // grouped data set val result2 = in.groupBy(0).first(3) // grouped-sorted data set val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
其他算子详见官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/
总结
一般公司里的业务场景需求肯定不止是只有批计算,也不只是流计算的。一般这种需求是都存在的。比如每天凌晨 0 点去跑昨天一天商品的售卖情况,然后出报表给运营或者老板去分析;另外的就是流处理。为了考虑到开发的成本以及开发人员的学习难度,后来推出了批流统一的 Flink Table/SQL API,在写这篇文章的时候 Flink 1.9版本正好发布,Blink 的代码已经在 Flink 1.9 的时候整合进去了。
本文介绍了 Flink 中的 DataStream 和 DataSet API 中常用的算子(Operator),其实我们在 Flink 中计算都是依靠这些算子,现在有 批流统一的 Flink Table/SQL API 。