本文根据flink官方提供的测试方法进行编写,重点在于展示如何实现Flink任务的单元测试。
测试是每个软件开发过程中不可或缺的一部分,单元测试的好处有很多,比如:
- 确保单个方法正常运行;
- 如果修改了方法代码,只需确保其对应的单元测试通过;
- 测试代码本身就可以作为示例代码;
- 可以自动化运行所有的测试并获得分析报告。
除上述好处之外,其本身也是规范化流程的一部分。那咱废话不多说,进入正题
测试用户自定义函数 对无状态、无时间限制的UDF进行单元测试我们以两个无状态的MapFunction和FlatMapFunction为例,实现相同的累加功能,并对该功能进行测试,下面分别是两个Function的具体实现
public class IncrementMapFunction implements MapFunction<Integer, Integer> {
@Override
public Integer map(Integer value) throws Exception {
return value + 1;
}
}
public class IncrementFlatMapFunction implements FlatMapFunction<Integer, Integer> {
@Override
public void flatMap(Integer value, Collector out) throws Exception {
out.collect(value + 1);
}
}
MapFunction的单元测试比较简单,直接使用测试框架就可以进行测试,测试代码如下
@Test
public void testIncrement() throws Exception {
// instantiate your function
IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
Assert.assertEquals(3, incrementer.map(2).intValue());
}
FlatMapFunction由于使用org.apache.flink.util.Collector
收集结果,因此在进行测试时需要提供Collector的模拟对象,有两种方法可以提供模拟对象,一种是通过Mock,另一种是通过ListCollector。在使用Mockito时,非SpringBoot项目需要手动引入依赖
<dependency>
<groupId>org.mockitogroupId>
<artifactId>mockito-coreartifactId>
<version>${mockito.version}version>
<scope>testscope>
dependency>
// use Mock to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
Collector<Integer> collector = mock(Collector.class);
// call the methods that you have implemented
incrementer.flatMap(2, collector);
//verify collector was called with the right output
Mockito.verify(collector, times(1)).collect(3);
}
// use ListCollector to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
// instantiate your function
IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
List<Integer> list = new ArrayList<>();
ListCollector<Integer> collector = new ListCollector<>(list);
// call the methods that you have implemented
incrementer.flatMap(2, collector);
//verify collector was called with the right output
Assert.assertEquals(Collections.singletonList(3), list);
}
对有状态或及时UDF和用户自定义算子进行单元测试
对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:
- OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
- KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
- TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
- KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)
要使用测试工具,还需要一组其他的依赖项,如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-test-utilsartifactId>
<version>1.15.0version>
<scope>testscope>
dependency>
需要注意的是1.15以前版本依赖不同于上述依赖。早期版本添加以下依赖项
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-test-utils_${scala.version}artifactId>
<version>${flink.version}version>
<scope>testscope>
dependency>
该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。如果想本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,还要添加以下依赖项(该依赖是1.15.0
版本中添加的,早期版本中没有该依赖):
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-test-utilsartifactId>
<version>1.15.0version>
<scope>testscope>
dependency>
测试Flink作业
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)