Flink单元测试用法讲解

Flink单元测试用法讲解,第1张

本文根据flink官方提供的测试方法进行编写,重点在于展示如何实现Flink任务的单元测试。

测试是每个软件开发过程中不可或缺的一部分,单元测试的好处有很多,比如:

  • 确保单个方法正常运行;
  • 如果修改了方法代码,只需确保其对应的单元测试通过;
  • 测试代码本身就可以作为示例代码;
  • 可以自动化运行所有的测试并获得分析报告。

除上述好处之外,其本身也是规范化流程的一部分。那咱废话不多说,进入正题

测试用户自定义函数 对无状态、无时间限制的UDF进行单元测试

我们以两个无状态的MapFunction和FlatMapFunction为例,实现相同的累加功能,并对该功能进行测试,下面分别是两个Function的具体实现

public class IncrementMapFunction implements MapFunction<Integer, Integer> {
    @Override
    public Integer map(Integer value) throws Exception {
        return value + 1;
    }
}
public class IncrementFlatMapFunction implements FlatMapFunction<Integer, Integer> {
    @Override
    public void flatMap(Integer value, Collector out) throws Exception {
        out.collect(value + 1);
    }
}

MapFunction的单元测试比较简单,直接使用测试框架就可以进行测试,测试代码如下

@Test
public void testIncrement() throws Exception {
	// instantiate your function
	IncrementMapFunction incrementer = new IncrementMapFunction();
	// call the methods that you have implemented
	Assert.assertEquals(3, incrementer.map(2).intValue());
}

FlatMapFunction由于使用org.apache.flink.util.Collector收集结果,因此在进行测试时需要提供Collector的模拟对象,有两种方法可以提供模拟对象,一种是通过Mock,另一种是通过ListCollector。在使用Mockito时,非SpringBoot项目需要手动引入依赖

<dependency>
	<groupId>org.mockitogroupId>
	<artifactId>mockito-coreartifactId>
	<version>${mockito.version}version>
	<scope>testscope>
dependency>
// use Mock to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
    // instantiate your function
    IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
    Collector<Integer> collector = mock(Collector.class);
    // call the methods that you have implemented
    incrementer.flatMap(2, collector);
    //verify collector was called with the right output
    Mockito.verify(collector, times(1)).collect(3);
}

// use ListCollector to simulate objects
@Test
public void testCustomFlatMapFunction() throws Exception {
    // instantiate your function
    IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
    List<Integer> list = new ArrayList<>();
    ListCollector<Integer> collector = new ListCollector<>(list);
    // call the methods that you have implemented
    incrementer.flatMap(2, collector);
    //verify collector was called with the right output
    Assert.assertEquals(Collections.singletonList(3), list);
}
对有状态或及时UDF和用户自定义算子进行单元测试

对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  • OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)
  • KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)
  • TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)
  • KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

要使用测试工具,还需要一组其他的依赖项,如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-test-utilsartifactId>
    <version>1.15.0version>
    <scope>testscope>
dependency>

需要注意的是1.15以前版本依赖不同于上述依赖。早期版本添加以下依赖项

<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-test-utils_${scala.version}artifactId>
    <version>${flink.version}version>
    <scope>testscope>
dependency>

该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。如果想本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,还要添加以下依赖项(该依赖是1.15.0版本中添加的,早期版本中没有该依赖):

<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-table-test-utilsartifactId>
    <version>1.15.0version>
    <scope>testscope>
dependency>
测试Flink作业

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/906182.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存