推荐答案
在 Apache Flink 中,DataStream 的 Transformation 操作主要包括以下几类:
- Map:对 DataStream 中的每个元素应用一个函数,生成一个新的 DataStream。
- FlatMap:类似于 Map,但每个输入元素可以生成零个、一个或多个输出元素。
- Filter:对 DataStream 中的每个元素应用一个布尔函数,只保留返回 true 的元素。
- KeyBy:根据指定的键将 DataStream 分区为 KeyedStream,以便进行后续的聚合操作。
- Reduce:对 KeyedStream 中的元素进行聚合操作,生成一个新的 DataStream。
- Fold:对 KeyedStream 中的元素进行累积操作,生成一个新的 DataStream。
- Aggregations:包括 sum、min、max 等聚合操作,对 KeyedStream 中的元素进行聚合。
- Window:对 KeyedStream 中的元素进行窗口操作,以便进行窗口内的聚合操作。
- Union:将两个或多个 DataStream 合并为一个 DataStream。
- Connect:将两个 DataStream 连接起来,生成一个 ConnectedStream,以便进行联合处理。
- Split:将一个 DataStream 拆分为多个 DataStream。
- Select:从 Split 操作生成的多个 DataStream 中选择一个或多个 DataStream。
- Project:从 DataStream 中选择特定的字段,生成一个新的 DataStream。
- Iterate:对 DataStream 进行迭代操作,生成一个新的 DataStream。
- CoMap, CoFlatMap:对 ConnectedStream 中的两个流进行联合处理。
本题详细解读
Map
Map
操作是 Flink 中最基本的 Transformation 操作之一。它接受一个函数作为参数,并将该函数应用到 DataStream 中的每个元素上,生成一个新的 DataStream。例如:
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4); DataStream<Integer> doubled = numbers.map(x -> x * 2);
FlatMap
FlatMap
操作与 Map
类似,但它允许每个输入元素生成零个、一个或多个输出元素。例如:
DataStream<String> words = env.fromElements("hello world", "flink"); DataStream<String> letters = words.flatMap((String value, Collector<String> out) -> { for (String letter : value.split("")) { out.collect(letter); } });
Filter
Filter
操作接受一个布尔函数作为参数,并只保留返回 true
的元素。例如:
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4); DataStream<Integer> evenNumbers = numbers.filter(x -> x % 2 == 0);
KeyBy
KeyBy
操作根据指定的键将 DataStream 分区为 KeyedStream。这个操作是后续聚合操作的基础。例如:
DataStream<Tuple2<String, Integer>> words = env.fromElements( new Tuple2<>("hello", 1), new Tuple2<>("world", 2) ); KeyedStream<Tuple2<String, Integer>, String> keyedWords = words.keyBy(0);
Reduce
Reduce
操作对 KeyedStream 中的元素进行聚合操作。例如:
DataStream<Tuple2<String, Integer>> words = env.fromElements( new Tuple2<>("hello", 1), new Tuple2<>("hello", 2) ); DataStream<Tuple2<String, Integer>> wordCounts = words .keyBy(0) .reduce((x, y) -> new Tuple2<>(x.f0, x.f1 + y.f1));
Fold
Fold
操作对 KeyedStream 中的元素进行累积操作。例如:
DataStream<Tuple2<String, Integer>> words = env.fromElements( new Tuple2<>("hello", 1), new Tuple2<>("hello", 2) ); DataStream<Tuple2<String, Integer>> wordCounts = words .keyBy(0) .fold(new Tuple2<>("", 0), (acc, value) -> new Tuple2<>(value.f0, acc.f1 + value.f1));
Aggregations
Flink 提供了多种聚合操作,如 sum
、min
、max
等。例如:
DataStream<Tuple2<String, Integer>> words = env.fromElements( new Tuple2<>("hello", 1), new Tuple2<>("hello", 2) ); DataStream<Tuple2<String, Integer>> wordCounts = words .keyBy(0) .sum(1);
Window
Window
操作对 KeyedStream 中的元素进行窗口操作。例如:
DataStream<Tuple2<String, Integer>> words = env.fromElements( new Tuple2<>("hello", 1), new Tuple2<>("hello", 2) ); DataStream<Tuple2<String, Integer>> wordCounts = words .keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .sum(1);
Union
Union
操作将两个或多个 DataStream 合并为一个 DataStream。例如:
DataStream<Integer> stream1 = env.fromElements(1, 2, 3); DataStream<Integer> stream2 = env.fromElements(4, 5, 6); DataStream<Integer> mergedStream = stream1.union(stream2);
Connect
Connect
操作将两个 DataStream 连接起来,生成一个 ConnectedStream。例如:
DataStream<Integer> stream1 = env.fromElements(1, 2, 3); DataStream<String> stream2 = env.fromElements("a", "b", "c"); ConnectedStreams<Integer, String> connectedStream = stream1.connect(stream2);
Split
Split
操作将一个 DataStream 拆分为多个 DataStream。例如:
-- -------------------- ---- ------- ------------------- ------- - ------------------- -- -- --- -------------------- ----------- - --------------------------------------- ----- -- - ------------ ------ - --- -------------- -- ------ - - -- -- - ------------------- - ---- - ------------------ - ------ ------- ---
Select
Select
操作从 Split
操作生成的多个 DataStream 中选择一个或多个 DataStream。例如:
DataStream<Integer> evenNumbers = splitStream.select("even"); DataStream<Integer> oddNumbers = splitStream.select("odd");
Project
Project
操作从 DataStream 中选择特定的字段,生成一个新的 DataStream。例如:
DataStream<Tuple3<Integer, String, Double>> data = env.fromElements( new Tuple3<>(1, "a", 1.0), new Tuple3<>(2, "b", 2.0) ); DataStream<Tuple2<Integer, Double>> projectedData = data.project(0, 2);
Iterate
Iterate
操作对 DataStream 进行迭代操作。例如:
DataStream<Long> numbers = env.generateSequence(1, 10); IterativeStream<Long> iteration = numbers.iterate(); DataStream<Long> iterationBody = iteration.map(x -> x - 1); iteration.closeWith(iterationBody.filter(x -> x > 0));
CoMap, CoFlatMap
CoMap
和 CoFlatMap
操作对 ConnectedStream
中的两个流进行联合处理。例如:
ConnectedStreams<Integer, String> connectedStream = stream1.connect(stream2); DataStream<String> result = connectedStream.map( (Integer value) -> "Integer: " + value, (String value) -> "String: " + value );