InfluxDB作为时序数据库,在与时间相关的数据记录中,发挥着巨大的作用。下文以flink为例,通过参考Flink第三方扩展(https://github.com/apache/bahir-flink/tree/master/flink-connector-influxdb2).
自定义source将数据写入influxDB 2.1.1中。
在完成以下工作时,请确保您已经安装并配置了InfluxDB 2.1.1,如果您还未安装配置,可参考以下文章(https://lrting-top.blog.csdn.net/article/details/122270992):
代码修改当前版本的 bahir-flink对influxdb的支持为2.0.0,如果直接使用该版本,则会出现认证不通过的情况,此时需要修改部分代码,使用token的认证方式。
具体为,InfluxDBSinkBuilder类中的getInfluxDBClient方法,修改为:
public static InfluxDBClient getInfluxDBClient(final Configuration configuration) { final String url = configuration.getString(INFLUXDB_URL); final String bucket = configuration.getString(INFLUXDB_BUCKET); final String organization = configuration.getString(INFLUXDB_ORGANIZATION); final String token = configuration.getString(INFLUXDB_TOKEN); final InfluxDBClientOptions influxDBClientOptions = InfluxDBClientOptions.builder() .url(url) .authenticateToken(token.toCharArray()) .bucket(bucket) .org(organization) .build(); return InfluxDBClientFactory.create(influxDBClientOptions); }
完整代码可参考(https://git.lrting.top/xiaozhch5/drfix):
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)