Flink 1.14将数据写入InfluxDB 2.1.1

Flink 1.14将数据写入InfluxDB 2.1.1,第1张

Flink 1.14将数据写入InfluxDB 2.1.1

InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2).

自定义source将数据写入influxDB 2.1.1中。

在完成以下工作时,请确保您已经安装并配置了InfluxDB 2.1.1,如果您还未安装配置,可参考以下文章(https://lrting-top.blog.csdn.net/article/details/122270992):

代码修改

当前版本的 bahir-flink对influxdb的支持为2.0.0,如果直接使用该版本,则会出现认证不通过的情况,此时需要修改部分代码,使用token的认证方式。

具体为,InfluxDBSinkBuilder类中的getInfluxDBClient方法,修改为:

    public static InfluxDBClient getInfluxDBClient(final Configuration configuration) {
        final String url = configuration.getString(INFLUXDB_URL);
        final String bucket = configuration.getString(INFLUXDB_BUCKET);
        final String organization = configuration.getString(INFLUXDB_ORGANIZATION);
        final String token = configuration.getString(INFLUXDB_TOKEN);
        final InfluxDBClientOptions influxDBClientOptions =
                InfluxDBClientOptions.builder()
                        .url(url)
                        .authenticateToken(token.toCharArray())
                        .bucket(bucket)
                        .org(organization)
                        .build();
        return InfluxDBClientFactory.create(influxDBClientOptions);
    }

完整代码可参考(https://git.lrting.top/xiaozhch5/drfix):

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5690190.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存