- hanlp处理数据流入kafka
- hanlp使用可以参考下面官网
- 本文样例
- 读文本文件,通过hanlp分词
- 进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
- 写 入kafka
- kafka中数据参考:
- flink处理kafka数据
- 本地webui 需要引入依赖,加上本地启动的配置即可访问页面
- 效果图
- pom文件
本文样例https://www.hanlp.com/
{"text":"这是个文本样例啊"}
{"text":"这是个文本样例啊"}
读文本文件,通过hanlp分词
进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
public void run() {
//停用词使用
CoreStopWordDictionary.load(HanLP.Config.CoreStopWordDictionaryPath, true);
// 分词结果不显示词性
HanLP.Config.ShowTermNature = false;
HanLP.Config.Normalization = true;
StringBuilder sb = new StringBuilder();
try{
File file = new File(filePath);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String str = null;
while ((str = bufferedReader.readLine()) != null) {
String inputStr = JSONObject.parseObject(str).getString("text");
//去除url 表情
String urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http):((//)|(\\))+[\w\d:#@%/;$()~_?\+-=\\\.&]*)[\ud83c\udc00-\ud83c\udfff]|[\ud83d\udc00-\ud83d\udfff]|[\u2600-\u27ff]|[\ud83e\udd00-\ud83e\uddff]|[\u2300-\u23ff]|[\u2500-\u25ff]|[\u2100-\u21ff]|[\u0000-\u00ff]|[\u2b00-\u2bff]|[\u2d06]|[\u3030]|\pP|\pS";
Pattern p = Pattern.compile(urlPattern, Pattern.CASE_INSENSITIVE);
Matcher m = p.matcher(inputStr);
inputStr = m.replaceAll("");
sb.append(inputStr);
}
//停用词使用
List<Term> segment = NLPTokenizer.segment(sb.toString());
CoreStopWordDictionary.apply(segment);
//写kafka
writeToKf(segment);
}catch (Exception e){
e.printStackTrace();
}
}
写 入kafka
public void writeToKf(List<Term> segment) {
//寫入kafka
Properties ps = new Properties();
ps.setProperty("bootstrap.servers","ip:9092");//集群地址
// ps.setProperty("group.id", "hotwords");
ps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化方式
ps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化方式
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(ps);
ProducerRecord<String, String> record = new ProducerRecord<String, String>("hotwords", segment.toString());
kafkaProducer.send(record);
kafkaProducer.close();
try {
Thread.sleep(1000*20);
}catch (Exception e){
e.printStackTrace();
}
}
kafka中数据参考:
flink处理kafka数据
参照了下面文章做的实验
本地webui 需要引入依赖,加上本地启动的配置即可访问页面https://blog.csdn.net/m0_49834705/article/details/115023005
class App {
public static void main(String[] args) throws Exception {
//本地啟動,webui
Configuration config = new Configuration();
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(RestOptions.PORT, 2841);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, config);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
Properties properties=new Properties();
properties.setProperty("bootstrap.servers","ip:9092");
properties.setProperty("group.id", "hotwords");
// properties.setProperty("auto.offset.reset", "latest");
// properties.setProperty("enable.auto.commit", "true")
// properties.setProperty("auto.commit.interval.ms", "15000")
//消費主題 hotword;自動提交offset
DataStreamSource<String> text = env.addSource(
new FlinkKafkaConsumer<String>("hotwords",
new SimpleStringSchema(),properties)
.setStartFromLatest()
// .setStartFromEarliest()
//自動提交
.setCommitOffsetsOnCheckpoints(true));
DataStream<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter());
DataStream<Tuple2<String, Integer>> wcount = ds
.keyBy(0) //按照Tuple2的第一个元素为key,也就是单词
.window(SlidingProcessingTimeWindows.of(Time.seconds(60),Time.seconds(20)))
//key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
.sum(1);// 将相同的key的元素第二个count值相加
// wcount.print("wc: ");
//所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
DataStream<Tuple2<String, Integer>> ret =
wcount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))).
process(new TopNAllFunction(20));//计算该窗口TopN
//保存到mysql
ret.addSink(new MySQLSink());
//打印
// ret.print("ret: ");
// wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
// .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
// .process(new TopNFunction(5)); //分组TopN统计
env.execute("Window WordCount");
}
/**
* 根據條件分詞
*/
private static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.split(",");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
/**
* 取首字母做key
*/
private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
// TODO Auto-generated method stub
return value.f0.substring(0, 1); //取首字母做key
}
}
}
public class TopNAllFunction extends ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
new Comparator<Integer>() {
@Override
public int compare(Integer y, Integer x) {
return (x < y) ? -1 : 1;
}
}); //treemap按照key降序排列,相同count值不覆盖
for (Tuple2<String, Integer> element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN个元素
treemap.pollLastEntry();
}
}
for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
out.collect(entry.getValue());
}
}
}
public class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>> {
Connection connection = null;
PreparedStatement insertSmt = null;
@Override
public void open(Configuration parameters) throws Exception {
String url = "jdbc:mysql://ip:3306/test-flink?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false";
connection = (Connection) DriverManager.getConnection(url,"root","Founder123");
insertSmt = (PreparedStatement) connection.prepareStatement("insert into tmp(word,num,cdate) values (?,?,?)");
}
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
//直接执行更新语句
// System.out.println("value.toString()-------" + value.toString());
insertSmt.setString(1,value.f0);
insertSmt.setInt(2,value.f1);
insertSmt.setString(3, LocalDateTime.now().toString());
insertSmt.execute();
}
@Override
public void close() throws Exception {
if (insertSmt != null){
insertSmt.close();
}
if (connection != null){
connection.close();
}
}
//jdbc
// JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
// .setDrivername("com.mysql.cj.jdbc.Driver")
// .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456")
// .setQuery("insert into words (word,count) values (?,?) ")
// //设置为每2条数据就提交一次
// .setBatchInterval(2)
// .finish();
}
CREATE TABLE `tmp` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
`num` int(11) DEFAULT NULL,
`cdate` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2416 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
效果图
pom文件
<properties>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
<java.version>1.8java.version>
<scala.version>2.11scala.version>
<guava.version>23.0guava.version>
<flink.version>1.13.2flink.version>
<async.client.version>2.10.4async.client.version>
<typesafe.config.version>1.3.3typesafe.config.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.4version>
dependency>
<dependency>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-lang3artifactId>
<version>3.6version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_${scala.version}artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-coreartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-testartifactId>
dependency>
<dependency>
<groupId>commons-iogroupId>
<artifactId>commons-ioartifactId>
<version>2.5version>
dependency>
<dependency>
<groupId>com.hankcsgroupId>
<artifactId>hanlpartifactId>
<version>portable-1.8.3version>
dependency>
dependencies>
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)