Hazelcast Jet Pipeline详解

Hazelcast Jet Pipeline详解,第1张

Hazelcast Jet Pipeline详解 前言

pipeline只有两种stage:stream和batch,主要看它的数据源是哪种,如果是StreamSource那就用StreamStage,如果是BatchSource那就用BatchStage。也可以通过 addTimestamps来把batch模拟成无边界流。

1. 避免事件乱序

为了避免乱序可以如下配置

Pipeline p = Pipeline.create();
p.setPreserveOrder(true);

这样配置后,Jet会给这些消息打上同样的partitioning key。会影响性能。如果你无意中改了partitioning key就会影响事件的顺序。比如你从带有分区的kafka消费,然后用了groupingKey(这个key不是kafka的分区key),那么你就潜在的打乱了事件的顺序。

2.多数据源

如下:

Pipeline p = Pipeline.create();

BatchSource leftSource = TestSources.items("the", "quick", "brown", "fox");
BatchSource rightSource = TestSources.items("jumps", "over", "the", "lazy", "dog");

BatchStage left = p.readFrom(leftSource);
BatchStage right = p.readFrom(rightSource);

left.merge(right)
    .writeTo(Sinks.logger());
3. 转换Transforms

Jet在数据源和目标数据中间有一层转换层,分为两类:
无状态转换:map,filter,flatMap,hashJoin
有状态转换:aggregate,rollingAggregate,distinct,window

4. 分布式内存数据结构

IMap: Hazelcast内置的,支持索引、查询、持久化,可以用在Jet的跑批和流式。
可以通过配置event journal来支持流式处理(支持Exactly-Once):

hazelcast:
  map:
    name_of_map:
      event-journal:
        enabled: true
        capacity: 100000
        time-to-live-seconds: 10

然后代码如下:

IMap userCache = jet.getMap("usersCache")
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(userCache, START_FROM_OLDEST))
 .withIngestionTimestamps()
 .writeTo(Sinks.logger()));

默认数据源只会输出ADDED和UPDATeD事件。
详细的例子: IMap Change Stream
注意,IMap是一个分布式对象,下面的写法是有潜在bug的:

IMap userCache = jet.getMap("users");
User user = userCache.get("user-id");
user.setAccessCount(user.getAccessCount() + 1);
userCache.put("user-id", user);

为了保证一致性,下面的写法才对:

static class IncrementEntryProcessor implements EntryProcessor {
    @Override
    public User process(Entry entry) {
        User value = entry.getValue();
        value.setAccessCount(value.getAccessCount() + 1);
        return entry.setValue(value);
    }
}

IMap userCache = jet.getMap("users");
userCache.executeOnEntry("user-id", new IncrementEntryProcessor());
5. CDC

如下配置监听Mysql的binlog:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(
    MySqlCdcSources.mysql("customers")
            .setDatabaseAddress("127.0.0.1")
            .setDatabasePort(3306)
            .setDatabaseUser("debezium")
            .setDatabasePassword("dbz")
            .setClusterName("dbserver1")
            .setDatabaseWhitelist("inventory")
            .setTableWhitelist("inventory.customers")
            .build())
    .withNativeTimestamps(0)
    .writeTo(Sinks.logger());

CDC只能支持At-Least-Once。数据源部分间歇性保存保存WAL offset,当发生故障或者重启的时候,offset之后的所有事件都会被重放。所以不是Exactly-Once。
因为数据库配置binlog的大小的原因,如果想回放很久以前的CDC可能发生数据丢失。所以配置合理的话,还是可以做到At-Least-Once的。

6. Pipeline的序列化

标准的Pipeline写法是用lambda,因为Pipeline是要序列化传到计算集群的,所以这些表达式都要支持序列化,又因为java.util.function是没有继承Serializable接口,因此Hazelcast的工程师实现了等同的ExXXX系列的接口,比如java.util.function.Function等同于com.hazelcast.function.FunctionEx。
除了表达式要支持Serializable,引用到的参数类也要支持Serializable。或者在buildPipeline方法内部声明内部变量。
还有一种情况,像DateTimeFormatter这种不可序列化的对象也是不能用的,但是我们可以用JDK预置的DateTimeFormatter.ISO_LOCAL_TIME在集群的目标机器上直接生成jvm对象。这个原理是,Jet集群支持java的static final,所以本质上你也可以自己创建static final变量。
最后一种情况是mapUsingService():

Pipeline p = Pipeline.create();
BatchStage src = p.readFrom(Sources.list("input"));
ServiceFactory serviceFactory = nonSharedService(
        pctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
                              .withZone(ZoneId.systemDefault()));
// 这里
src.mapUsingService(serviceFactory,
        (formatter, tstamp) -> formatter.format(Instant.ofEpochMilli(tstamp)));

但是默认的Java序列化Serializable性能并不高,看下面的benchmark:

Strategy                                        Number of Bytes  Overhead %
java.io.Serializable                                        162         523
java.io.Externalizable                                       87         234
com.hazelcast.nio.serialization.Portable                    104         300
com.hazelcast.nio.serialization.StreamSerializer             26           0
7. spring集成

Hazelcast增加了一个新的注解@SpringAware,它的作用是:
可以 *** 作bean属性
可以监听callback,例如ApplicationContextAware, BeanNameAware
可以 *** 作bean的post-processing注解,例如InitializingBean, @PostConstruct
例如:

@SpringAware
public class SourceContext {

    @Resource(name = "my-source-map")
    IMap sourceMap;
}

@SpringAware
public class SinkContext {

    @Resource(name = "my-sink-map")
    IMap sinkMap;
}

还有一种方式是用JetSpringServiceFactories,但是需要配置xml:


代码如下:

Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.list("list"))
        .mapUsingService(JetSpringServiceFactories.bean("my-bean"), (myBean, item) -> myBean.enrich(item))
        .writeTo(Sinks.logger());
8. springboot集成

Hazelcast提供的springboot starter 暂时找不到了。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5687003.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存