配置Confluent使Kafka支持Rest方式请求

配置Confluent使Kafka支持Rest方式请求,第1张

配置Confluent使Kafka支持Rest方式请求

配置Confluent使Kafka支持Rest方式请求
      • 安装confluent
      • 配置
      • 启动
      • 测试kafka-rest

安装confluent

Confulent 下载地址:https://www.confluent.io/installation


这里选择tar格式进行下载。下载完成后上传到自己的linux自定义安装目录里面,解压并安装

wget https://packages.confluent.io/archive/7.0/confluent-6.0.1.tar.gz?_ga=2.201784529.1316457547.1639380267-1161245413.1639380267

tar -zxvf confluent-6.0.1.tar.gz

confluent已经为您准备好了kafka与kafka-rest,所以你可以直接使用它准备的kafka进行安装,当然您也可以单独下载进行安装,这里演示的时候kafka已经提前安装好了。

配置

安装好后,需要对/etc/confluent-6.0.1/etc/kafka-rest/kafka-rest.properties进行部分配置才可使用,具体如下:

bootstrap.servers=PLAINTEXT://localhost:9092
listeners=http://0.0.0.0:6082

bootstrap.servers为kafka服务地址,listeners为kafka-rest监听的地址.

启动

进入Confluent安装目录使用以下命令进行安装,如果不进入对应目录则需要输入全路径。

/etc/confluent-6.0.1/bin/kafka-rest  /etc/kafka-rest/kafka-rest.properties

上面的这种方式是前台启动,也可以使用nohup方式进行后台启动,并设置日志输出。

nohup /etc/confluent-6.0.1/bin/kafka-rest-start /etc/confluent-6.0.1/etc/kafka-rest/kafka-rest.properties > /etc/confluent-6.0.1/logs/kafka-rest/kafka-rest.log 2>&1 &

当看到日志输出以下内容则表示启动成功!

测试kafka-rest

kafka-rest只能接收json格式的数据,具体格式如下

public class Test {

	public static void main(String[] args){
		 //这里测试就不填充数据了。
		 List> testData = new ArrayList<>();
		 String topic = "inner.clickhouse.test";
		 String url = "http://192.168.1.7:6082/topics/"+topic;
		 String data = "{"records":[{"value":"+JacksonUtil.toJsonString(testData )+"}]}";
		 String header = "application/vnd.kafka.json.v2+json";
		 post(url, data, header);
	}

    public static String post(String url,String param,String header){
        logger.error("Send to:{} , params:{}",url,param);
        HttpClient client = buildClient();
        HttpRequest request = buildPostRequest(url,param,header);
        CompletableFuture future = client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
                .thenApply(HttpResponse::body);
        String result = "";
        try {
            result = future.get();
        } catch (InterruptedException e) {
            logger.error("{} request fail,reason:{}",url,e.getMessage());
            return "-99";
        } catch (ExecutionException e) {
            logger.error("{} request fail,reason:{}",url,e.getMessage());
            return "-88";
        }
        return result;
    }

    public static HttpRequest buildPostRequest(String url, String param,String header){
        //创建 builder
        HttpRequest.Builder reBuilder = HttpRequest.newBuilder();
        //链式调用
        HttpRequest request = reBuilder
                //存入消息头
                //消息头是保存在一张 TreeMap 里的
                .header("Content-Type", header)
                //http 协议版本
                .version(HttpClient.Version.HTTP_2)
                //url 地址
                .uri(URI.create(url))
                //超时时间
                .timeout(Duration.ofMillis(5000))
                //发起一个 post 消息,需要存入一个消息体
                .POST(HttpRequest.BodyPublishers.ofString(param))
                //发起一个 get 消息,get 不需要消息体
                //创建完成
                .build();
        return request;
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665232.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存