Spark中的dataFrame如何实现Python中dataFrame的describe功能

Spark中的dataFrame如何实现Python中dataFrame的describe功能,第1张

#!/usr/bin/env python3# -- coding: utf-8 --"""

Created on Fri Jun 8 16:27:57 2018

@author: luogan

"""import pandas as pdfrom pysparksql import SparkSession

spark= SparkSession\

builder \

appName("dataFrame") \

getOrCreate()# Loads datall3=pdDataFrame([[1,2],[3,4]],columns=['a','b'])

cc=ll3valuestolist()

dd=list(ll3columns)#df=sparkcreateDataFrame(ll3)#turn pandasDataFrame to sparkdataFramespark_df = sparkcreateDataFrame(cc, dd)

print('sparkdataFram=',spark_dfshow())#turn sparkdataFrame to pandasDataFrame pandas_df = spark_df toPandas()

print('pandasDataFrame=',pandas_df)

,客户端和虚拟集群中hadoop、spark、scala的安装目录是一致的,这样开发的spark应用程序的时候不需要打包spark开发包和scala的库文件,减少不必要的网络IO和磁盘IO。当然也可以不一样,不过在使用部署工具spark-submit的时候需要参数指明classpath。

1:IDEA的安装

官网jetbrainscom下载IntelliJ IDEA,有Community Editions 和& Ultimate Editions,前者免费,用户可以选择合适的版本使用。

根据安装指导安装IDEA后,需要安装scala插件,有两种途径可以安装scala插件:

启动IDEA -> Welcome to IntelliJ IDEA -> Configure -> Plugins -> Install JetBrains plugin -> 找到scala后安装。

启动IDEA -> Welcome to IntelliJ IDEA -> Open Project -> File -> Settings -> plugins -> Install JetBrains plugin -> 找到scala后安装。

随着互联网的不断发展,我们对python编程开发技术的学习和掌握程度也在不断的提高。下面我们就通过案例分析来了解和学习一下,关于程序扩展都有哪些 *** 作方法。

必要的概念

传统编程依赖于两个核心概念:函数和类。使用这些构建块就可以构建出无数的应用程序。

但是,当我们将应用程序迁移到分布式环境时,这些概念通常会发生变化。

一方面,OpenMPI、Python多进程和ZeroMQ等工具提供了用于发送和接收消息的低级原语。这些工具非常强大,但它们提供了不同的抽象,因此要使用它们就必须从头开始重写单线程应用程序。

另一方面,我们也有一些特定领域的工具,例如用于模型训练的TensorFlow、用于数据处理且支持SQL的Spark,以及用于流式处理的Flink。这些工具提供了更高级别的抽象,如神经网络、数据集和流。但是,因为它们与用于串行编程的抽象不同,所以要使用它们也必须从头开始重写应用程序。

用于分布式计算的工具

Ray占据了一个独特的中间地带。它并没有引入新的概念,而是采用了函数和类的概念,并将它们转换为分布式的任务和actor。Ray可以在不做出重大修改的情况下对串行应用程序进行并行化。

开始使用Ray

rayinit()命令将启动所有相关的Ray进程。在切换到集群时,这是需要更改的行(我们需要传入集群地址)。java课程培训机构发现这些过程包括:

有很多worker进程并行执行Python函数(大概是每个CPU核心对应一个worker)。

用于将“任务”分配给worker(以及其他计算机)的调度程序进程。任务是Ray调度的工作单元,对应于一个函数调用或方法调用。

就个人体会来说,Scala相对于Java的优势是巨大的。熟悉Scala之后再看Java代码,有种读汇编的感觉……

如果仅仅是写Spark应用,并非一定要学Scala,可以直接用Spark的Java API或Python API。但因为语言上的差异,用Java开发Spark应用要罗嗦许多。好在带lambda的Java 8出来之后有所改善。

在Spark应用开发上,学Scala主要好处有二:

开发效率更高,代码更精简;

使用Spark过程中出现异常情况,在排查时如果对Spark源码比较熟悉,可以事半功倍。

使用Spark,Scala不是必须的,Spark有Java、Python、R的API。

但是要想深入学习,还是建议学习Scala的。

如果你会Scala,Spark都不用怎么学。因为一般的Scala程序就是这么写的,用Spark的区别就是换了一套API而已

博主项目实践中,经常需要用Spark从Hbase中读取数据。其中,spark的版本为16,hbase的版本为098。现在记录一下如何在spark中 *** 作读取hbase中的数据。

对于这种 *** 作型的需求,没有什么比直接上代码更简单明了的了。so,show me the code!

object Demo extends Logging{

val CF_FOR_FAMILY_USER = BytestoBytes("U");

val CF_FOR_FAMILY_DEVICE = BytestoBytes("D")

val QF_FOR_MODEL = BytestoBytes("model")

val HBASE_CLUSTER = "hbase://xxx/"

val TABLE_NAME = "xxx";

val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME

def genData(sc:SparkContext) = {

//20161229的数据,rowkey的设计为9999-yyyyMMdd

val filter_of_1229 = new RowFilter(CompareFilterCompareOpEQUAL, new SubstringComparator("79838770"))

//得到qf为w:00-23的数据

val filter_of_qf = new QualifierFilter(CompareFilterCompareOpEQUAL,new SubstringComparator("w"))

val all_filters = new utilArrayList[Filter]()

all_filtersadd(filter_of_1229)

all_filtersadd(filter_of_qf)

//hbase多个过滤器

val filterList = new FilterList(all_filters)

val scan = new Scan()addFamily(CF_FOR_FAMILY_USER)

scansetFilter(filterList)

scansetCaching(1000)

scansetCacheBlocks(false)

val conf = HBaseConfigurationcreate()

confset(TableInputFormatINPUT_TABLE,HBASE_TABLE )

confset(TableInputFormatSCAN, Base64encodeBytes(ProtobufUtiltoScan(scan)toByteArray()))

scnewAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

//后面是针对hbase查询结果的具体业务逻辑

map()

def main(args: Array[String]): Unit = {

val Array(output_path) = args

val sparkConf = new SparkConf()setAppName("demo")

sparkConfset("sparkserializer", "orgapachesparkserializerKryoSerializer")

val sc = new SparkContext(sparkConf)

genUuidWifi(sc)saveAsTextFile(output_path)

scstop()

}

}1234567891011121314151617181920212223242526272829303132333435363738394041424344454612345678910111213141516171819202122232425262728293031323334353637383940414243444546

需要注意的一个小点就是如果hbase里有多个过滤器,注意需要使用FilterList。

以上就是关于Spark中的dataFrame如何实现Python中dataFrame的describe功能全部的内容,包括:Spark中的dataFrame如何实现Python中dataFrame的describe功能、intellij idea 怎么编写python程序打包发送到spark、python编程开发关于程序扩展 *** 作等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/9876163.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-02
下一篇 2023-05-02

发表评论

登录后才能评论

评论列表(0条)

保存