Apache NiFi 存储redis HashMap格式的数据

Apache NiFi 存储redis HashMap格式的数据,第1张

NiFi Redis进行HashMap存储

Created: April 1, 2022 2:37 PM
Tags: Work
Topic: Apache nifi
日期: April 1, 2022

场景: redis存储hashmap格式

使用nifi验证官方提供的处理器,只支持put-也就是set方式.对其他方式目前支持.也不知道为啥
官方的处理器使用方式

魔改官方处理器PutDistributedMapCache 首先要理清PutDistributedMapCache 处理器如何使用.
  1. 注册连接池
  2. 封装redis存储方法,发布成服务
  3. 提供处理器使用
明白了步骤,接下来我们开始改造
  1. 因为要完全重新创建一个一模一样的处理器,连接池和服务也都要创建,不然读取不到;

  2. 连接池服务是RedisConnectionPoolService

    1. 新建一个自定义项目,如果不知道怎么创建,请看如果新建自定义项目
    2. 原封不动的代码都拷贝,就改个名字就好. 把该引入的包都引入,
    3. 如果还有爆红,说明nifi的依赖包.没有下好.这个我也是踩了很多坑
    💡 该连接池使用的底层是jedis连接池,使用的是springboot2.5.1的redis
  3. 封装redis的方法服务是 RedisDistributedMapCacheClientService

  4. 需要复制的清单

    //用到了3个接口,2个服务,2个工具类
    //接口
    RedisConnectionPool
    DistributedMapCacheClient  改名
    RedisAction
    //服务
    RedisConnectionPoolService  改名
    RedisDistributedMapCacheClientService  改名
    //工具类
    RedisUtils
    RedisType
    
  5. 新建一个处理器 PutDistributedHashMapRedis

  6. 开始改造

    1. 连接池原封不动,就能用

    2. redis实现服务需要重新实现一个支持hashMap的redis方法

    3. 处理调用hashMap的方法即可;

    4. 大致思路确定

    5. 对DistributedMapCacheClient 接口新增一个hset方法

      public interface DistributedMapCacheClient2 extends ControllerService {
      
          void hset(String hkey,String key, byte[] value) throws IOException;
      }
      
    6. 对RedisDistributedMapCacheClientService 同时实现hset方法

      public class RedisDistributedMapCacheClientService2 extends AbstractControllerService implements DistributedMapCacheClient2 {
      		@Override
          public void hset(String hkey, String key, byte[] value) throws IOException {
              withConnection(redisConnection -> {
      //            final TupleC kv = new TupleC(hkey.getBytes(StandardCharsets.UTF_8), key.getBytes(StandardCharsets.UTF_8), value);
      //            redisConnection.hSet(kv.getKey(), kv.getField(), kv.getValue());
      							//验证不需要对象实例化可以存储,所以就去除了实体对象
                  redisConnection.hSet(hkey.getBytes(StandardCharsets.UTF_8), key.getBytes(), value);
                  return null;
              });
          }
      }
      
    7. PutDistributedHashMapRedis 处理器

      public class PutDistributedHashMapRedis extends AbstractProcessor {
      		//新增一个key的选项
      		public static final PropertyDescriptor HASHKEY_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
                  .name("Hash cache Entry Identifier")
                  .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the cache key")
                  .required(true)
                  .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
                  .build();
      	//其他不动.但是Distributed Cache Service 选项卡需要改变一下接口
      // Identifies the distributed map cache client
          public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
                  .name("Distributed Cache Service")
                  .description("The Controller Service that is used to cache flow files")
                  .required(true)
                  .identifiesControllerService(**DistributedMapCacheClient2**.class)
                  .build();
      	
      @Override
          protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
              final List<PropertyDescriptor> descriptors = new ArrayList<>();
              **descriptors.add(HASHKEY_CACHE_ENTRY_IDENTIFIER);**
              descriptors.add(CACHE_ENTRY_IDENTIFIER);
              descriptors.add(DISTRIBUTED_CACHE_SERVICE);
              descriptors.add(CACHE_UPDATE_STRATEGY);
              descriptors.add(CACHE_ENTRY_MAX_BYTES);
              return descriptors;
          }
      
      //实现方法
      public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
      	//从原处理器把代码都复制过来即可,原处理器是:PutDistributedMapCache 
      	if (updateStrategy.equals(CACHE_UPDATE_REPLACE.getValue())) {
                      **cache.hset(hashKey, cacheKey, cacheValue);**
                      cached = true;
                  } else if (updateStrategy.equals(CACHE_UPDATE_KEEP_ORIGINAL.getValue())) {
      //                final byte[] oldValue = cache.getAndPutIfAbsent(cacheKey, cacheValue, keySerializer, valueSerializer, valueDeserializer);
      //                if (oldValue == null) {
      //                    cached = true;
      //                }
                  }
      }
      }
      
    8. 改完把pom的jar改成nar,打包放到nifi项目的lib中,启动即可

如果有条件的,可以观看我的原文
https://spice-pony-728.notion.site/NiFi-Redis-HashMap-0d9384e4291441c7a63509fbcf43c234

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/563396.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-02
下一篇 2022-04-02

发表评论

登录后才能评论

评论列表(0条)

保存