- 数据分析
- 数据读取
- 准备工作
- 读取文件
- 数据清洗
- 数据类型转换
- 解决报错问题
- 剪除异常数据
- 完整代码显示
数据集结构
其中有几点需要注意
- hack_license 是出租车执照, 可以唯一标识一辆出租车
- pickup_datetime 和 dropoff_datetime 分别是上车时间和下车时间, 通过这个时间, 可以获知行车时间
- pickup_longitude 和 dropoff_longitude 是经度, 经度所代表的是横轴, 也就是 X 轴
- pickup_latitude 和 dropoff_latitude 是纬度, 纬度所代表的是纵轴, 也就是 Y 轴
跳转顶部
数据读取 准备工作
配置pom文件
4.0.0 cn.itcast taxi0.0.1 2.11.8 2.2.0 2.7.5 1.7.16 1.2.17 5.1.35 2.2.2 3.6.6 org.scala-lang scala-library${scala.version} org.scala-lang.modules scala-xml_2.111.0.6 org.apache.spark spark-core_2.11${spark.version} org.apache.spark spark-sql_2.11${spark.version} org.apache.hadoop hadoop-client${hadoop.version} com.esri.geometry esri-geometry-api${esri.version} org.json4s json4s-native_2.11${json4s.version} org.json4s json4s-jackson_2.11${json4s.version} org.slf4j jcl-over-slf4j${slf4j.version} org.slf4j slf4j-api${slf4j.version} org.slf4j slf4j-log4j12${slf4j.version} log4j log4j${log4j.version} src/main/scala org.apache.maven.plugins maven-compiler-plugin3.0 1.8 UTF-8 net.alchim31.maven scala-maven-plugin3.2.0 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies
创建scala目录,并且将这个目录设置为Source Root
目录结构如下
创建SarkSession与读取文件
//创建SparkSession val spark = SparkSession.builder() .master("local[6]") .appName("taxi") .getOrCreate() //导入隐式转换 import spark.implicits._ import org.apache.spark.sql.functions._ //数据读取 val taxiRow = spark.read .option("header", true) .csv("dataset/half_trip.csv") .show()
结果展示
跳转顶部
数据清洗 数据类型转换
剪去多余列
- 现在数据集中包含了一些多余的列, 在后续的计算中并不会使用到, 如果让这些列参与计算的话, 会影响整体性能, 浪费集群资源
类型转换
- 可以看到, 现在的数据集中, 所有列类型都是 String, 而在一些统计和运算中, 不能使用 String 来进行, 所以要将这些数据转为对应的类型
我们可以自定义一个样例类将Row类型数据转换成对象类
case class Trip( license: String, pickUpTime: Long, dropOffTime: Long, pickUpX: Double, pickUpY: Double, dropOffX: Double, dropOffY: Double )
我们在逐行转换数据类型时不知道数据是否为空,因此我们应该新建一个类来判断数据是否为空
因为在针对 Row 类型对象进行数据转换时, 需要对一列是否为空进行判断和处理, 在 Scala中为空的处理进行一些支持和封装, 叫做 Option, 所以在读取 Row 类型对象的时候, 要返回 Option 对象, 通过一个包装类, 可以轻松做到这件事
class RichRow(row: Row) { // 为了返回空值,提醒外面进行处理 def getAs[T](field: String): Option[T] = { //判断是否为空 if (row.isNullAt(row.fieldIndex(field))) { None } else { Some(row.getAs[T](field)) } } }
RichRow类的解释与分析
- 该类的输入时Row就是行
- getAs方法输入的参数field是列名,输出的是每列的数据
- 之所将数据包装成option类是因为,,此类有个方法getOrElse,使用该方法,不为空是时输出数据,为空时输出0或者指定的数据
我们需要的数据只有三种数据类型,分别是:String、Long、Double。,字符类型在数据读取时,默认就是,但是后两者类型需要我们自己手动转换
将数据转换成Long类型
def parseTime(row: RichRow, field: String): Long = { //表示时间类型的格式 val pattern = "yyyy-MM-dd HH:mm:ss" val formatter = new SimpleDateFormat(pattern, Locale.ENGLISH) //执行转换,获取时间戳 val time: Option[String] = row.getAs[String](field) val timeOption = time.map(time => formatter.parse(time).getTime) timeOption.getOrElse(0l) }
读取Double类型数据
def parseLocation(row: RichRow, field: String): Double = { //获取数据 val location: Option[String] = row.getAs[String](field) //转换数据 val locationOption: Option[Double] = location.map(local => local.toDouble) locationOption.getOrElse(0.0) }
此时我们可以将数据一起转换成自定义的类型
def parse(row: Row): Trip = { val richRow = new RichRow(row) val license = richRow.getAs[String]("hack_license").orNull val pickUpTime = parseTime(richRow, "pickup_datetime") val pickOffTime = parseTime(richRow, "dropoff_datetime") val pickUpX = parseLocation(richRow, "pickup_longitude") val pickUpY = parseLocation(richRow, "pickup_latitude") val pickOffX = parseLocation(richRow, "dropoff_longitude") val pickOffY = parseLocation(richRow, "dropoff_latitude") Trip(license, pickUpTime, pickOffTime, pickUpX, pickUpY, pickOffX, pickOffY) }解决报错问题
我们上面在对数据进行类型转换时,可能会因为数据的错误而报错,那我们改输入解决?
def safe[P, R](f: P => R): P => Either[R, (P, Exception)] = { new Function[P, Either[R, (P, Exception)]] with Serializable { override def apply(param: P): Either[R, (P, Exception)] = { try { Left(f(param)) } catch { case e: Exception => Right((param, e)) } } } }
解释与分析
- 我们为了保证数据处理过程中的安全性,可以在rdd.map的时候调用函数safe
- 但是map输入的是一个函数parse,所以我们应该也是输入函数p
- safe接受的参数也是函数p,但是这个函数p会返回一个函数r
- 所以safe现在是有两种输出的结果,一个是输入的函数变换之后的r
- 一个是输入函数和报错信息
针对异常值进行处理
val taxiParsed: RDD[Either[Trip, (Row, Exception)]] = taxiRow.rdd.map(safe(parse)) //现在result里面全是有问题的row val result: RDD[Row] = taxiParsed.filter(e => e.isRight) .map(e => e.right.get._1) //结果 val taxiGood = taxiParsed.map(either => either.left.get).toDS()
剪除异常数据
val hours = (pickupTime: Long, dropoffTime: Long) => { val duration = dropoffTime - pickupTime val hour = TimeUnit.HOURS.convert(duration, TimeUnit.MICROSECONDS) hour } val hoursUDF = udf(hours) spark.udf.register("hours",hours) val taxiClean = taxiGood.where("hours(pickUpTime,dropOffTime) between 0 and 3") .show()
结果展示
跳转顶部
完整代码显示
package taxi import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import java.text.SimpleDateFormat import java.util.Locale import java.util.concurrent.TimeUnit object TaxiAnalysisRunner { def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder() .master("local[6]") .appName("taxi") .getOrCreate() //导入隐式转换 import spark.implicits._ import org.apache.spark.sql.functions._ //数据读取 val taxiRow = spark.read .option("header", true) .csv("dataset/half_trip.csv") //转换 *** 作 val taxiParsed: RDD[Either[Trip, (Row, Exception)]] = taxiRow.rdd.map(safe(parse)) //现在result里面全是有问题的row // val result: RDD[Row] = taxiParsed.filter(e => e.isRight) // .map(e => e.right.get._1) //结果 val taxiGood = taxiParsed.map(either => either.left.get).toDS() //绘制时长直方图 //统计时长 val hours = (pickupTime: Long, dropoffTime: Long) => { val duration = dropoffTime - pickupTime val hour = TimeUnit.HOURS.convert(duration, TimeUnit.MICROSECONDS) hour } val hoursUDF = udf(hours) spark.udf.register("hours",hours) val taxiClean = taxiGood.where("hours(pickUpTime,dropOffTime) between 0 and 3") .show() } def safe[P, R](f: P => R): P => Either[R, (P, Exception)] = { new Function[P, Either[R, (P, Exception)]] with Serializable { override def apply(param: P): Either[R, (P, Exception)] = { try { Left(f(param)) } catch { case e: Exception => Right((param, e)) } } } } def parse(row: Row): Trip = { val richRow = new RichRow(row) val license = richRow.getAs[String]("hack_license").orNull val pickUpTime = parseTime(richRow, "pickup_datetime") val pickOffTime = parseTime(richRow, "dropoff_datetime") val pickUpX = parseLocation(richRow, "pickup_longitude") val pickUpY = parseLocation(richRow, "pickup_latitude") val pickOffX = parseLocation(richRow, "dropoff_longitude") val pickOffY = parseLocation(richRow, "dropoff_latitude") Trip(license, pickUpTime, pickOffTime, pickUpX, pickUpY, pickOffX, pickOffY) } def parseTime(row: RichRow, field: String): Long = { //表示时间类型的格式 val pattern = "yyyy-MM-dd HH:mm:ss" val formatter = new SimpleDateFormat(pattern, Locale.ENGLISH) //执行转换,获取时间戳 val time: Option[String] = row.getAs[String](field) val timeOption = time.map(time => formatter.parse(time).getTime) timeOption.getOrElse(0l) } def parseLocation(row: RichRow, field: String): Double = { //获取数据 val location: Option[String] = row.getAs[String](field) //转换数据 val locationOption: Option[Double] = location.map(local => local.toDouble) locationOption.getOrElse(0.0) } } class RichRow(row: Row) { // 为了返回空值,提醒外面进行处理 def getAs[T](field: String): Option[T] = { //判断是否为空 if (row.isNullAt(row.fieldIndex(field))) { None } else { Some(row.getAs[T](field)) } } } case class Trip( license: String, pickUpTime: Long, dropOffTime: Long, pickUpX: Double, pickUpY: Double, dropOffX: Double, dropOffY: Double )
跳转顶部
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)