RDD 是 Resilient Distributes Datasets 的缩写。
RDD 基于cluster中node个数进行partition
1. RDD Transformations从当前RDD创建一个新的RDD懒加载:the results are only computed when evaluated by actions
比如map()就是一个transformation,从一个RDD根据对应函数生成另外一个RDD
2. RDD ActionsActions return a value to driver program after running a computation.
比如reduce()就是一个action,用于aggregates all RDD elements
3. DAGDAG的全称:Directed Acyclic Graph
Spark依赖DAGS确保fault tolerance,当一个节点坏掉,Spark复制DAG重新回复node
- 创建SparkContext与SparkSession创建RDDDataframes 和 SparkSQL的使用
预备工作
# Installing required packages !pip install pyspark !pip install findspark import findspark findspark.init() # PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession
SparkContext是spark app的入口,包含一系列的function,比如创建RDD的parallelize()
SparkSession是SparkSQL和Dataframe *** 作的必须品
创建SparkContext和SparkSession的实例:
# Creating a spark context class sc = SparkContext() # Creating a spark session spark = SparkSession .builder .appName("Python Spark Dataframes basic example") .config("spark.some.config.option", "some-value") .getOrCreate()
创建RDD然后使用transformations
# create an RDD which has integres from 1-30 data = range(1, 30) xrangeRDD = sc.parallelize(data, 4) # transformations subRDD = xrangeRDD.map(lambda x: x-1) filteredRDD = subRDD.filter(lambda x : x<10)
创建Dataframe并使用多种方法查询数据,最后关闭。
# Read the dataset into a spark dataframe using the `read.json()` function df = spark.read.json("***.json").cache() # Print the dataframe as well as the data schema df.show() df.printSchema() # Register the Dataframe as a SQL temporary view df.createTempView("people") # Select and show basic data columns df.select("name").show() df.select(df["name"]).show() spark.sql("SELECT name FROM people").show() # Perform basic filtering df.filter(df["age"] > 21).show() spark.sql("SELECt age, name FROM people WHERe age > 21").show() # 在单独col上的 *** 作,新创造一列old,数值为age的3倍 df.withColumn('old', df['age']*3).show() # Perfom basic aggregation of data df.groupBy("age").count().show() spark.sql("SELECt age, COUNT(age) as count FROM people GROUP BY age").show() # close the SparkSession spark.stop()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)