通过hanlp分词写入kafka在flink计算词频统计实时热词topN写入mysql数据库

通过hanlp分词写入kafka在flink计算词频统计实时热词topN写入mysql数据库,第1张

通过hanlp分词写入kafka在flink计算词频统计热词topN写入mysql数据库
  • hanlp处理数据流入kafka
      • hanlp使用可以参考下面官网
    • 本文样例
    • 读文本文件,通过hanlp分词
      • 进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
    • 写 入kafka
      • kafka中数据参考:
  • flink处理kafka数据
      • 本地webui 需要引入依赖,加上本地启动的配置即可访问页面
      • 效果图
      • pom文件

hanlp处理数据流入kafka hanlp使用可以参考下面官网

https://www.hanlp.com/

本文样例
{"text":"这是个文本样例啊"}
{"text":"这是个文本样例啊"}
读文本文件,通过hanlp分词 进行了文版表情剔除,url剔除 正则处理,带语性分析需要下载hanlp的data放到resource下,使用标准的不用
public void run() {
 		//停用词使用
        CoreStopWordDictionary.load(HanLP.Config.CoreStopWordDictionaryPath, true);
        // 分词结果不显示词性
        HanLP.Config.ShowTermNature = false;
        HanLP.Config.Normalization = true;
        StringBuilder sb = new StringBuilder();
        try{
            File file = new File(filePath);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
            String str = null;
            while ((str = bufferedReader.readLine()) != null) {
                String inputStr = JSONObject.parseObject(str).getString("text");
                //去除url  表情
                String urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http):((//)|(\\))+[\w\d:#@%/;$()~_?\+-=\\\.&]*)[\ud83c\udc00-\ud83c\udfff]|[\ud83d\udc00-\ud83d\udfff]|[\u2600-\u27ff]|[\ud83e\udd00-\ud83e\uddff]|[\u2300-\u23ff]|[\u2500-\u25ff]|[\u2100-\u21ff]|[\u0000-\u00ff]|[\u2b00-\u2bff]|[\u2d06]|[\u3030]|\pP|\pS";
                Pattern p = Pattern.compile(urlPattern, Pattern.CASE_INSENSITIVE);
                Matcher m = p.matcher(inputStr);
                inputStr = m.replaceAll("");
                sb.append(inputStr);
            }
            //停用词使用
            List<Term> segment = NLPTokenizer.segment(sb.toString());
            CoreStopWordDictionary.apply(segment);
            //写kafka
            writeToKf(segment);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
写 入kafka
    public void writeToKf(List<Term> segment) {
        //寫入kafka
        Properties ps = new Properties();
        ps.setProperty("bootstrap.servers","ip:9092");//集群地址
//        ps.setProperty("group.id", "hotwords");
        ps.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key序列化方式
        ps.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value序列化方式
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(ps);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("hotwords", segment.toString());
        kafkaProducer.send(record);
        kafkaProducer.close();
        try {
            Thread.sleep(1000*20);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
kafka中数据参考:

flink处理kafka数据

参照了下面文章做的实验

https://blog.csdn.net/m0_49834705/article/details/115023005

本地webui 需要引入依赖,加上本地启动的配置即可访问页面

class App {
        public static void main(String[] args) throws Exception {
            //本地啟動,webui
            Configuration config = new Configuration();
            config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
            config.setInteger(RestOptions.PORT, 2841);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8, config);

//            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
            Properties properties=new Properties();
            properties.setProperty("bootstrap.servers","ip:9092");
            properties.setProperty("group.id", "hotwords");
//          properties.setProperty("auto.offset.reset", "latest");
//          properties.setProperty("enable.auto.commit", "true")
//          properties.setProperty("auto.commit.interval.ms", "15000")
            //消費主題 hotword;自動提交offset
            DataStreamSource<String> text = env.addSource(
                    new FlinkKafkaConsumer<String>("hotwords",
                            new SimpleStringSchema(),properties)
                            .setStartFromLatest()
//                            .setStartFromEarliest()
                            //自動提交
                            .setCommitOffsetsOnCheckpoints(true));

            DataStream<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter());

            DataStream<Tuple2<String, Integer>> wcount = ds
                    .keyBy(0) //按照Tuple2的第一个元素为key,也就是单词
                    .window(SlidingProcessingTimeWindows.of(Time.seconds(60),Time.seconds(20)))
                    //key之后的元素进入一个总时间长度为600s,每20s向后滑动一次的滑动窗口
                    .sum(1);// 将相同的key的元素第二个count值相加

//            wcount.print("wc: ");

            //所有key元素进入一个20s长的窗口(选20秒是因为上游窗口每20s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
            DataStream<Tuple2<String, Integer>> ret =
                    wcount.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20))).
                            process(new TopNAllFunction(20));//计算该窗口TopN

            //保存到mysql
            ret.addSink(new MySQLSink());
            //打印
//            ret.print("ret: ");

//            wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分组
//                    .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口统计上游数据
//                    .process(new TopNFunction(5)); //分组TopN统计

            env.execute("Window WordCount");
        }

    /**
     * 根據條件分詞
     */
    private static final class LineSplitter implements   FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.split(",");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    /**
     * 取首字母做key
     */
    private static class TupleKeySelectorByStart implements KeySelector<Tuple2<String, Integer>, String> {
        @Override
        public String getKey(Tuple2<String, Integer> value) throws Exception {
            // TODO Auto-generated method stub
            return value.f0.substring(0, 1); //取首字母做key
        }
    }
}
public class TopNAllFunction  extends  ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> {
    private int topSize = 10;

    public TopNAllFunction(int topSize) {
        // TODO Auto-generated constructor stub

        this.topSize = topSize;
    }


    @Override
    public void process(Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
        TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                new Comparator<Integer>() {
                    @Override
                    public int compare(Integer y, Integer x) {
                        return (x < y) ? -1 : 1;
                    }
                }); //treemap按照key降序排列,相同count值不覆盖
        for (Tuple2<String, Integer> element : input) {
            treemap.put(element.f1, element);
            if (treemap.size() > topSize) { //只保留前面TopN个元素
                treemap.pollLastEntry();
            }
        }

        for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap.entrySet()) {
            out.collect(entry.getValue());
        }
    }
}
public class MySQLSink extends RichSinkFunction<Tuple2<String, Integer>>  {
    Connection connection = null;
    PreparedStatement insertSmt = null;


    @Override
    public void open(Configuration parameters) throws Exception {
        String url = "jdbc:mysql://ip:3306/test-flink?autoReconnect=true&useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&useSSL=false";
        connection = (Connection) DriverManager.getConnection(url,"root","Founder123");
        insertSmt = (PreparedStatement) connection.prepareStatement("insert into tmp(word,num,cdate) values (?,?,?)");
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        //直接执行更新语句
//        System.out.println("value.toString()-------" + value.toString());
        insertSmt.setString(1,value.f0);
        insertSmt.setInt(2,value.f1);
        insertSmt.setString(3, LocalDateTime.now().toString());
        insertSmt.execute();
    }

    @Override
    public void close() throws Exception {
        if (insertSmt != null){
            insertSmt.close();
        }
        if (connection != null){
            connection.close();
        }
    }

    //jdbc
//            JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
//                    .setDrivername("com.mysql.cj.jdbc.Driver")
//                    .setDBUrl("jdbc:mysql://localhost:3306/test?user=root&password=123456")
//                    .setQuery("insert into  words (word,count) values (?,?) ")
//                    //设置为每2条数据就提交一次
//                    .setBatchInterval(2)
//                    .finish();
}
CREATE TABLE `tmp` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `word` varchar(255) COLLATE utf8mb4_bin DEFAULT NULL,
  `num` int(11) DEFAULT NULL,
  `cdate` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2416 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
效果图

pom文件
<properties>
		<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
		<java.version>1.8java.version>
        <scala.version>2.11scala.version>
        <guava.version>23.0guava.version>
        <flink.version>1.13.2flink.version>
        <async.client.version>2.10.4async.client.version>
        <typesafe.config.version>1.3.3typesafe.config.version>
	properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.bootgroupId>
			<artifactId>spring-boot-starter-webartifactId>
		dependency>




		<dependency>
			<groupId>com.alibabagroupId>
			<artifactId>fastjsonartifactId>
			<version>1.2.4version>
		dependency>

        <dependency>
            <groupId>org.apache.commonsgroupId>
            <artifactId>commons-lang3artifactId>
            <version>3.6version>
        dependency>

        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-connector-kafka_${scala.version}artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_${scala.version}artifactId>
            <version>${flink.version}version>

        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_${scala.version}artifactId>
            <version>${flink.version}version>

        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-coreartifactId>
            <version>${flink.version}version>
        dependency>

		<dependency>
			<groupId>org.springframework.bootgroupId>
			<artifactId>spring-boot-starter-testartifactId>
		dependency>
		
		<dependency>
			<groupId>commons-iogroupId>
			<artifactId>commons-ioartifactId>
			<version>2.5version>
		dependency>
        
        <dependency>
            <groupId>com.hankcsgroupId>
            <artifactId>hanlpartifactId>
            <version>portable-1.8.3version>
        dependency>

    dependencies>

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/915884.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存