Created: April 1, 2022 2:37 PM
Tags: Work
Topic: Apache nifi
日期: April 1, 2022
魔改官方处理器PutDistributedMapCache 首先要理清PutDistributedMapCache 处理器如何使用.使用nifi验证官方提供的处理器,只支持put-也就是set方式.对其他方式目前支持.也不知道为啥
官方的处理器使用方式
- 注册连接池
- 封装redis存储方法,发布成服务
- 提供处理器使用
-
因为要完全重新创建一个一模一样的处理器,连接池和服务也都要创建,不然读取不到;
-
连接池服务是
RedisConnectionPoolService
- 新建一个自定义项目,如果不知道怎么创建,请看如果新建自定义项目
- 原封不动的代码都拷贝,就改个名字就好. 把该引入的包都引入,
- 如果还有爆红,说明nifi的依赖包.没有下好.这个我也是踩了很多坑
-
封装redis的方法服务是
RedisDistributedMapCacheClientService
-
需要复制的清单
//用到了3个接口,2个服务,2个工具类 //接口 RedisConnectionPool DistributedMapCacheClient 改名 RedisAction //服务 RedisConnectionPoolService 改名 RedisDistributedMapCacheClientService 改名 //工具类 RedisUtils RedisType
-
新建一个处理器
PutDistributedHashMapRedis
-
开始改造
-
连接池原封不动,就能用
-
redis实现服务需要重新实现一个支持hashMap的redis方法
-
处理调用hashMap的方法即可;
-
大致思路确定
-
对DistributedMapCacheClient 接口新增一个hset方法
public interface DistributedMapCacheClient2 extends ControllerService { void hset(String hkey,String key, byte[] value) throws IOException; }
-
对RedisDistributedMapCacheClientService 同时实现hset方法
public class RedisDistributedMapCacheClientService2 extends AbstractControllerService implements DistributedMapCacheClient2 { @Override public void hset(String hkey, String key, byte[] value) throws IOException { withConnection(redisConnection -> { // final TupleC kv = new TupleC(hkey.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), value); // redisConnection.hSet(kv.getKey(), kv.getField(), kv.getValue()); //验证不需要对象实例化可以存储,所以就去除了实体对象 redisConnection.hSet(hkey.getBytes(StandardCharsets.UTF_8), key.getBytes(), value); return null; }); } }
-
PutDistributedHashMapRedis
处理器public class PutDistributedHashMapRedis extends AbstractProcessor { //新增一个key的选项 public static final PropertyDescriptor HASHKEY_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder() .name("Hash cache Entry Identifier") .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the cache key") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); //其他不动.但是Distributed Cache Service 选项卡需要改变一下接口 // Identifies the distributed map cache client public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() .name("Distributed Cache Service") .description("The Controller Service that is used to cache flow files") .required(true) .identifiesControllerService(**DistributedMapCacheClient2**.class) .build(); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> descriptors = new ArrayList<>(); **descriptors.add(HASHKEY_CACHE_ENTRY_IDENTIFIER);** descriptors.add(CACHE_ENTRY_IDENTIFIER); descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(CACHE_UPDATE_STRATEGY); descriptors.add(CACHE_ENTRY_MAX_BYTES); return descriptors; } //实现方法 public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { //从原处理器把代码都复制过来即可,原处理器是:PutDistributedMapCache if (updateStrategy.equals(CACHE_UPDATE_REPLACE.getValue())) { **cache.hset(hashKey, cacheKey, cacheValue);** cached = true; } else if (updateStrategy.equals(CACHE_UPDATE_KEEP_ORIGINAL.getValue())) { // final byte[] oldValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer); // if (oldValue == null) { // cached = true; // } } } }
-
改完把pom的jar改成nar,打包放到nifi项目的lib中,启动即可
-
如果有条件的,可以观看我的原文
https://spice-pony-728.notion.site/NiFi-Redis-HashMap-0d9384e4291441c7a63509fbcf43c234
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)