Hanlp分词器(通过spark)

Hanlp分词器(通过spark),第1张

Hanlp分词器(通过spark)

这里主要是对内容数据进行标签处理

这里我们是用分词器是HanLP

HanLP是哈工大提供的一种中文分词的工具,因为他支持Java API

这里我们使用spark + hanlp进行中文分词

1、准备工作

##1. 在hdfs创建目录用于存放hanlp的数据
[root@hadoop ~]# hdfs dfs -mkdir -p /common/nlp/data

##2. 将hanlp的工具上传到服务器的指定位置
##3. 解压到当前目录
[root@hadoop soft]# tar -zxvf hanlp.dictionary.tgz

##4. 将语料库上传到hdfs的指定位置
[root@hadoop soft]# hdfs dfs -put ./dictionary/ /common/nlp/data

##5. 将这个hanlp.properties拷贝到当前工程下的resources目录下
使用Hanlp必须继承IIOAdapter,因为是使用我们自定义的分词库
package com.fuwei.bigdata.profile.nlp.hanlp

import com.hankcs.hanlp.corpus.io.{IIOAdapter, IOUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import java.io.{FileInputStream, InputStream, OutputStream}
import java.net.URI



class HadoopFileIoAdapter extends IIOAdapter{

    
    override def open(path: String): InputStream = {
        //1、获取 *** 作hdfs的文件系统对象
        val configuration = new Configuration()
        val fs: FileSystem = FileSystem.get(URI.create(path), configuration)
        //2、判断路径是否存在
        if (fs.exists(new Path(path))){//此时说明存在
            fs.open(new Path(path))
        }else{
            if (IOUtil.isResource(path)){
                //判断这个资源路径是否为hanlp需要的资源路径
                IOUtil.getResourceAsStream("/"+path)
            }else{
                new FileInputStream(path)
            }
        }
    }

    
    override def create(path: String): OutputStream = {
        val configuration = new Configuration()
        val fs: FileSystem = FileSystem.get(URI.create(path),configuration)
        fs.create(new Path(path))
    }

}

package com.fuwei.bigdata.profile

import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizer
import com.fuwei.bigdata.profile.conf.Config
import com.fuwei.bigdata.profile.utils.SparkUtils
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Dataframe, SaveMode, SparkSession}
import org.slf4j.LoggerFactory

import java.util
import scala.collection.{JavaConversions, mutable}






object NewsContentSegment {
    private val logger = LoggerFactory.getLogger(NewsContentSegment.getClass.getSimpleName)

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.WARN)
        //1、解析参数
        val params: Config = Config.parseConfig(NewsContentSegment, args)
        System.setProperty("HADOOP_USER_NAME",params.proxyUser)
        logger.warn("job is running please wait for a moment ......")

        //2、获取SparkSession
        val spark: SparkSession = SparkUtils.getSparkSession(params.env, NewsContentSegment.getClass.getSimpleName)
        import spark.implicits._

        //3、如果是做本地测试,没有必要显示所有代码,测试10行数据即可
        var limitData = ""
        if (params.env.equalsIgnoreCase("dev")){
            limitData = "limit 10"
        }

        //4、读取源数据
        val sourceArticleDataSQL =
            s"""
               |select
               |""".stripMargin

        val sourceDF: Dataframe = spark.sql(sourceArticleDataSQL)
        sourceDF.show()

        //5、分词
        val termsDF: Dataframe = sourceDF.mapPartitions(partition => {
            //5.1存放结果的集合
            var resTermList: List[(String, String)] = List[(String, String)]()

            //5.2遍历分区数据
            partition.foreach(row => {
                //5.3获取到字段信息
                val article_id: String = row.getAs("").toString
                val context: String = row.getAs("").toString

                //5.4分词
                val terms: util.List[Term] = StandardTokenizer.segment(context)
                //5.5去除停用词
                val stopTerms: util.List[Term] = CoreStopWordDictionary.apply(terms) //去除terms中的停用词

                //5.6转换为scala的buffer
                val stopTermsAsScalaBuffer: mutable.Buffer[Term] = JavaConversions.asScalaBuffer(stopTerms)

                //5.7保留名词,去除单个汉字,单词之间使用逗号隔开
                val convertTerms: String = stopTermsAsScalaBuffer.filter(term => {
                    term.nature.startsWith("n") && term.word.length != 1
                }).map(term => term.word).mkString(",")

                //5.8构建单个结果
                var res = (article_id, convertTerms)

                //5.9去除空值
                if (convertTerms.length != 0) {
                    resTermList = res :: resTermList //向结果中追加
                }
            })
            resTermList.iterator
        }).toDF("article_id", "context_terms")

        termsDF.show()

        //6、写入到hive
        termsDF.write.mode(SaveMode.Overwrite).format("ORC").saveAsTable("dwd_news.news_article_terms")

        //7、释放资源
        spark.close()
        logger.info(" job has success.....")
    }

}

spark自定义jar包测试

${SPARK_HOME}/bin/spark-submit 
--jars /data/apps/hive-1.2.1/auxlib/hudi-spark-bundle_2.11-0.5.2-incubating.jar 
--conf spark.sql.hive.convertmetastoreParquet=false 
--conf spark.executor.heartbeatInterval=120s 
--conf spark.network.timeout=600s 
--conf spark.sql.catalogImplementation=hive 
--conf spark.yarn.submit.waitAppCompletion=false 
--name user_profile_terms 
--conf spark.task.cpus=1 
--conf spark.executor.cores=4 
--conf spark.sql.shuffle.partitions=50 
--master yarn 
--deploy-mode cluster 
--driver-memory 1G 
--executor-memory 3G 
--num-executors 1 
--class com.fuwei.bigdata.profile.NewsContentSegment 
/data/jar/user-profile.jar 
-e prod -x root

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5701838.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存