Hive和Spark当中对小文件的处理

Hive和Spark当中对小文件的处理,第1张

数仓面试高频考点:

【在Hive中如何解析小文件过多问题,指定的是:处理表中数据时,有很多小文件】

| Table Parameters: | NULL | NULL |

| | bucketing_version | 2 |

| | numFiles | 1 |

| | numRows| 0 |

| | rawDataSize| 0 |

| | totalSize | 656 |

| | transient_lastDdlTime | 1631525001|

如果没有显示表的统计信息,执行如下命令,再次查看表信息

ANALYZE TABLE db_hive.emp COMPUTE STATISTICS

| Table Parameters: | NULL | NULL|

| | COLUMN_STATS_ACCURATE | {"BASIC_STATS":"true"} |

| | bucketing_version | 2 |

| | numFiles | 1 |

| | numRows| 14 |

| | rawDataSize| 643 |

| | totalSize | 656 |

| | transient_lastDdlTime | 1655113125 |

| | NULL | NULL|

第一种,将小文件合并成一个大文件

第二种,使用SparkContext中提供: wholeTextFiles 方法,专门读取小文件数据。

将每个文件作为一条KV存储在RDD中, K:文件名的绝对路径,V:文件的内容

用于解决小文件的问题,可以将多个小文件变成多个KV,自由指定分区个数

spark进入txt文件的命令

1、首先启动spark-shell进入Spark-shell模式:(进入spark目录下后 输入命令 bin/spark-shell启动spark-shell模式)

2、加载text文件(spark创建sc,可以加载本地文件和HDFS文件创建RDD)

val textFile = sc.textFile("file:///home/hadoop/test1.txt") #注意file:后是三个“/”

注意:加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file://)进行标识。

3、获取RDD文件textFile所有项(文本文件即总共行数)的计数(还有很多其他的RDD *** 作,自行百度)

textFile.count() #统计结果显示 1 行

二、在 spark-shell 中读取 HDFS 系统文件“/home/hadoop/test.csv(也可以是txt文件)”(如果该文件不存在, 请先创建),然后,统计出文件的行数:

方法一:

1、加载text文件(spark创建sc,可以加载本地文件和HDFS文件创建RDD)

val textFile = sc.textFile("hdfs:///home/hadoop/test.csv") #注意hdfs:后是三个“/”

注意:加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和file://)进行标识。

2、获取RDD文件textFile所有项的计数

textFile.count() #统计结果显示 1 行

方法二:(Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md”的错误。)

1、省去方法一中第一步的命令(1)中的“hdfs://”,其他部分相同,命令如下:

三、编写独立应用程序,读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在, 请先创建),然后,统计出文件的行数;通过 sbt 工具将整个应用程序编译打包成 JAR 包, 并将生成的 JAR 包通过 spark-submit 提交到 Spark 中运行命令:

1、首先输入:quit 命令退出spark-shell模式:

2、在终端中执行如下命令创建一个文件夹 sparkapp3 作为应用程序根目录:

cd ~ # 进入用户主文件夹

mkdir ./sparkapp3 # 创建应用程序根目录

mkdir -p ./sparkapp3/src/main/scala # 创建所需的文件夹结构

3、在 ./sparkapp3/src/main/scala 下建立一个名为 SimpleApp.scala 的文件(vim ./sparkapp3/src/main/scala/SimpleApp.scala),添加代码如下:

/* SimpleApp.scala */

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

import org.apache.spark.SparkConf

object SimpleApp {

def main(args: Array[String]) {

val logFile = "hdfs://localhost:9000/home/hadoop/test.csv"

val conf = new SparkConf().setAppName("Simple Application")

val sc = new SparkContext(conf)

val logData = sc.textFile(logFile, 2)

val num = logData.count()

println("这个文件有 %d 行!".format(num))

}

}

4、该程序依赖 Spark API,因此我们需要通过 sbt 进行编译打包。 ./sparkapp3 中新建文件 simple.sbt(vim ./sparkapp3/simple.sbt),添加内容如下,声明该独立应用程序的信息以及与 Spark 的依赖关系:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0-preview2"

使用Apache Spark可以方便地读取并处理日志文件中的记录内容。

下面是一个使用Spark读取日志文件中的记录内容的示例代码:

# 导入Spark相关的库

from pyspark import SparkContext, SparkConf

# 创建SparkContext对象

sc = SparkContext(appName="Log Processing")

# 读取日志文件

log_file = sc.textFile("/path/to/log/file.log")

# 按行解析日志记录

log_records = log_file.map(lambda line: line.split(" "))

# 过滤出指定类型的日志记录

filtered_records = log_records.filter(lambda record: record[2] == "ERROR")

# 对日志记录进行处理,如统计数量或分析日志信息等

# ...

# 关闭SparkContext

sc.stop()

上面的示例代码首先使用Spark的textFile()方法读取日志文件,然后使用map()方法将日志文件的每一行按空格分割成一个数组,得到一个日志记录的RDD。接着使用filter()方法过滤出指定类型的日志记录,最后对日志记录进行处理。

使用Spark处理日志文件的优点在于,可以利用Spark的分布式计算能力,对大量的日志文件进行快速的处理。例如,可以使用Spark的MapReduce算法快速统计日志文件中各种类型的记录数量,或者使用Spark SQL快速查询日志文件中的特定信息。

此外,Spark还提供了丰富的API和算法库,可以方便地进行数据清洗、数据分析和特征提取等复杂的数据处理任务。例如,可以使用Spark MLlib库进行机器学习,或者使用Spark GraphX库进行图计算等。

总之,使用Spark可以方便地读取并处理日志文件中的记录内容,是一种高效的数据处理方式。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/12112033.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-21
下一篇 2023-05-21

发表评论

登录后才能评论

评论列表(0条)

保存