Flink消费kafka消息实战,2021必看-Java高级面试题总结

Flink消费kafka消息实战,2021必看-Java高级面试题总结,第1张

Flink消费kafka消息实战,2021必看-Java高级面试题总结
  • @param raw

  • @return

*/

public static SingleMessage parse(String r

【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】

浏览器打开:qq.cn.hn/FTf 免费领取

aw){

SingleMessage singleMessage = null;

if (raw != null) {

singleMessage = JSONObject.parseObject(raw, SingleMessage.class);

}

return singleMessage;

}

}

  1. SingleMessage对象的定义:

public class SingleMessage {

private long timeLong;

private String name;

private String bizID;

private String time;

private String message;

public long getTimeLong() {

return timeLong;

}

public void setTimeLong(long timeLong) {

this.timeLong = timeLong;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getBizID() {

return bizID;

}

public void setBizID(String bizID) {

this.bizID = bizID;

}

public String getTime() {

return time;

}

public void setTime(String time) {

this.time = time;

}

public String getMessage() {

return message;

}

public void setMessage(String message) {

this.message = message;

}

}

  1. 实时处理的 *** 作都集中在StreamingJob类,源码的关键位置已经加了注释,就不再赘述了:

package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.api.java.tuple.Tuple;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.TimeCharacteristic;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;

import org.apache.flink.streaming.api.functions.windowing.WindowFunction;

import org.apache.flink.streaming.api.watermark.Watermark;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import javax.annotation.Nullable;

import java.util.Properties;

public class StreamingJob {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(5000); // 要设置启动检查点

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties props = new Properties();

props.setProperty(“bootstrap.servers”, “kafka1:9092”);

props.setProperty(“group.id”, “flink-group”);

//数据源配置,是一个kafka消息的消费者

FlinkKafkaConsumer011 consumer =

new FlinkKafkaConsumer011<>(“topic001”, new SimpleStringSchema(), props);

//增加时间水位设置类

consumer.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks (){

@Override

public long extractTimestamp(String element, long previousElementTimestamp) {

return JSONHelper.getTimeLongFromRawMessage(element);

}

@Nullable

@Override

public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {

if (lastElement != null) {

return new Watermark(JSONHelper.getTimeLongFromRawMessage(lastElement));

}

return null;

}

});

env.addSource(consumer)

//将原始消息转成Tuple2对象,保留用户名称和访问次数(每个消息访问次数为1)

.flatMap((FlatMapFunction>) (s, collector) -> {

SingleMessage singleMessage = JSONHelper.parse(s);

if (null != singleMessage) {

collector.collect(new Tuple2<>(singleMessage.getName(), 1L));

}

})

//以用户名为key

.keyBy(0)

//时间窗口为2秒

.timeWindow(Time.seconds(2))

//将每个用户访问次数累加起来

.apply((WindowFunction, Tuple2, Tuple, TimeWindow>) (tuple, window, input, out) -> {

long sum = 0L;

for (Tuple2 record: input) {

sum += record.f1;

}

Tuple2 result = input.iterator().next();

result.f1 = sum;

out.collect(result);

})

//输出方式是STDOUT

.print();

env.execute(“Flink-Kafka demo”);

}

}

  1. 在pom.xml所在文件夹执行以下命令打包:

mvn clean package -Dmaven.test.skip=true -U

  1. 打包成功后,会在target目录下生成文件flinkkafkademo-1.0-SNAPSHOT.jar,将此文件提交到Flinkserver上,如下图:

  1. 点击下图红框中的"Upload"按钮:

  1. 如下图,选中刚刚上传的文件,填写类名,再点击"Submit"按钮即可启动Job:

  1. 如下图,在Overview页面可见正在运行的任务:

现在所有服务都准备完毕,可以生产消息验证了;

[](

)在机器192.168.1.104上发起压力测试,产生大量消息

  1. 登录部署了Apache Bench的机器,执行以下命令:

ab -n 10000 -c 2 http://192.168.1.101:8080/send/Jack/hello

192.168.1.101是消息生产者的web服务的地址,上述命令发起了并发数为2的压力测试,一共会发起一万次请求;

  1. 压力测试完毕后,在Flink的Task Managers页面的Stdout页可以见到实时计算的统计数据,如下图:

至此,Flink消费kafka消息的实战就全部完成了,本次实战从消息产生到实时处理全部实现,希望在您构建基于kafak的实时计算环境时可以提供一些参考;

[](

)欢迎关注我的公众号:程序员欣宸

力测试,一共会发起一万次请求;

  1. 压力测试完毕后,在Flink的Task Managers页面的Stdout页可以见到实时计算的统计数据,如下图:

至此,Flink消费kafka消息的实战就全部完成了,本次实战从消息产生到实时处理全部实现,希望在您构建基于kafak的实时计算环境时可以提供一些参考;

[](

)欢迎关注我的公众号:程序员欣宸
[外链图片转存中…(img-VWTAXgom-1636031805915)]

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5076178.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存