Spark[1]:基本概念与python接口使用

Spark[1]:基本概念与python接口使用,第1张

Spark[1]:基本概念与python接口使用 一、RDD

RDD 是 Resilient Distributes Datasets 的缩写。

RDD 基于cluster中node个数进行partition

1. RDD Transformations

从当前RDD创建一个新的RDD懒加载:the results are only computed when evaluated by actions

比如map()就是一个transformation,从一个RDD根据对应函数生成另外一个RDD

2. RDD Actions

Actions return a value to driver program after running a computation.

比如reduce()就是一个action,用于aggregates all RDD elements

3. DAG

DAG的全称:Directed Acyclic Graph

Spark依赖DAGS确保fault tolerance,当一个节点坏掉,Spark复制DAG重新回复node

二、基础 *** 作
    创建SparkContext与SparkSession创建RDDDataframes 和 SparkSQL的使用

预备工作

# Installing required packages
!pip install pyspark
!pip install findspark

import findspark
findspark.init()

# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

SparkContext是spark app的入口,包含一系列的function,比如创建RDD的parallelize()

SparkSession是SparkSQL和Dataframe *** 作的必须品

创建SparkContext和SparkSession的实例:

# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession 
    .builder 
    .appName("Python Spark Dataframes basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

创建RDD然后使用transformations

# create an RDD which has integres from 1-30
data = range(1, 30)
xrangeRDD = sc.parallelize(data, 4)

# transformations
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)

创建Dataframe并使用多种方法查询数据,最后关闭。

# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("***.json").cache()
# Print the dataframe as well as the data schema
df.show()
df.printSchema()
# Register the Dataframe as a SQL temporary view
df.createTempView("people")

# Select and show basic data columns
df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()


# Perform basic filtering
df.filter(df["age"] > 21).show()
spark.sql("SELECt age, name FROM people WHERe age > 21").show()

# 在单独col上的 *** 作,新创造一列old,数值为age的3倍
df.withColumn('old', df['age']*3).show()

# Perfom basic aggregation of data
df.groupBy("age").count().show()
spark.sql("SELECt age, COUNT(age) as count FROM people GROUP BY age").show()


# close the SparkSession
spark.stop()

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700400.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存