#!/usr/bin/env python3# -- coding: utf-8 --"""
Created on Fri Jun 8 16:27:57 2018
@author: luogan
"""import pandas as pdfrom pysparksql import SparkSession
spark= SparkSession\
builder \
appName("dataFrame") \
getOrCreate()# Loads datall3=pdDataFrame([[1,2],[3,4]],columns=['a','b'])
cc=ll3valuestolist()
dd=list(ll3columns)#df=sparkcreateDataFrame(ll3)#turn pandasDataFrame to sparkdataFramespark_df = sparkcreateDataFrame(cc, dd)
print('sparkdataFram=',spark_dfshow())#turn sparkdataFrame to pandasDataFrame pandas_df = spark_df toPandas()
print('pandasDataFrame=',pandas_df)
,客户端和虚拟集群中hadoop、spark、scala的安装目录是一致的,这样开发的spark应用程序的时候不需要打包spark开发包和scala的库文件,减少不必要的网络IO和磁盘IO。当然也可以不一样,不过在使用部署工具spark-submit的时候需要参数指明classpath。
1:IDEA的安装
官网jetbrainscom下载IntelliJ IDEA,有Community Editions 和& Ultimate Editions,前者免费,用户可以选择合适的版本使用。
根据安装指导安装IDEA后,需要安装scala插件,有两种途径可以安装scala插件:
启动IDEA -> Welcome to IntelliJ IDEA -> Configure -> Plugins -> Install JetBrains plugin -> 找到scala后安装。
启动IDEA -> Welcome to IntelliJ IDEA -> Open Project -> File -> Settings -> plugins -> Install JetBrains plugin -> 找到scala后安装。
随着互联网的不断发展,我们对python编程开发技术的学习和掌握程度也在不断的提高。下面我们就通过案例分析来了解和学习一下,关于程序扩展都有哪些 *** 作方法。
必要的概念
传统编程依赖于两个核心概念:函数和类。使用这些构建块就可以构建出无数的应用程序。
但是,当我们将应用程序迁移到分布式环境时,这些概念通常会发生变化。
一方面,OpenMPI、Python多进程和ZeroMQ等工具提供了用于发送和接收消息的低级原语。这些工具非常强大,但它们提供了不同的抽象,因此要使用它们就必须从头开始重写单线程应用程序。
另一方面,我们也有一些特定领域的工具,例如用于模型训练的TensorFlow、用于数据处理且支持SQL的Spark,以及用于流式处理的Flink。这些工具提供了更高级别的抽象,如神经网络、数据集和流。但是,因为它们与用于串行编程的抽象不同,所以要使用它们也必须从头开始重写应用程序。
用于分布式计算的工具
Ray占据了一个独特的中间地带。它并没有引入新的概念,而是采用了函数和类的概念,并将它们转换为分布式的任务和actor。Ray可以在不做出重大修改的情况下对串行应用程序进行并行化。
开始使用Ray
rayinit()命令将启动所有相关的Ray进程。在切换到集群时,这是需要更改的行(我们需要传入集群地址)。java课程培训机构发现这些过程包括:
有很多worker进程并行执行Python函数(大概是每个CPU核心对应一个worker)。
用于将“任务”分配给worker(以及其他计算机)的调度程序进程。任务是Ray调度的工作单元,对应于一个函数调用或方法调用。
就个人体会来说,Scala相对于Java的优势是巨大的。熟悉Scala之后再看Java代码,有种读汇编的感觉……
如果仅仅是写Spark应用,并非一定要学Scala,可以直接用Spark的Java API或Python API。但因为语言上的差异,用Java开发Spark应用要罗嗦许多。好在带lambda的Java 8出来之后有所改善。
在Spark应用开发上,学Scala主要好处有二:
开发效率更高,代码更精简;
使用Spark过程中出现异常情况,在排查时如果对Spark源码比较熟悉,可以事半功倍。
使用Spark,Scala不是必须的,Spark有Java、Python、R的API。
但是要想深入学习,还是建议学习Scala的。
如果你会Scala,Spark都不用怎么学。因为一般的Scala程序就是这么写的,用Spark的区别就是换了一套API而已
博主项目实践中,经常需要用Spark从Hbase中读取数据。其中,spark的版本为16,hbase的版本为098。现在记录一下如何在spark中 *** 作读取hbase中的数据。
对于这种 *** 作型的需求,没有什么比直接上代码更简单明了的了。so,show me the code!
object Demo extends Logging{
val CF_FOR_FAMILY_USER = BytestoBytes("U");
val CF_FOR_FAMILY_DEVICE = BytestoBytes("D")
val QF_FOR_MODEL = BytestoBytes("model")
val HBASE_CLUSTER = "hbase://xxx/"
val TABLE_NAME = "xxx";
val HBASE_TABLE = HBASE_CLUSTER + TABLE_NAME
def genData(sc:SparkContext) = {
//20161229的数据,rowkey的设计为9999-yyyyMMdd
val filter_of_1229 = new RowFilter(CompareFilterCompareOpEQUAL, new SubstringComparator("79838770"))
//得到qf为w:00-23的数据
val filter_of_qf = new QualifierFilter(CompareFilterCompareOpEQUAL,new SubstringComparator("w"))
val all_filters = new utilArrayList[Filter]()
all_filtersadd(filter_of_1229)
all_filtersadd(filter_of_qf)
//hbase多个过滤器
val filterList = new FilterList(all_filters)
val scan = new Scan()addFamily(CF_FOR_FAMILY_USER)
scansetFilter(filterList)
scansetCaching(1000)
scansetCacheBlocks(false)
val conf = HBaseConfigurationcreate()
confset(TableInputFormatINPUT_TABLE,HBASE_TABLE )
confset(TableInputFormatSCAN, Base64encodeBytes(ProtobufUtiltoScan(scan)toByteArray()))
scnewAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
//后面是针对hbase查询结果的具体业务逻辑
map()
def main(args: Array[String]): Unit = {
val Array(output_path) = args
val sparkConf = new SparkConf()setAppName("demo")
sparkConfset("sparkserializer", "orgapachesparkserializerKryoSerializer")
val sc = new SparkContext(sparkConf)
genUuidWifi(sc)saveAsTextFile(output_path)
scstop()
}
}1234567891011121314151617181920212223242526272829303132333435363738394041424344454612345678910111213141516171819202122232425262728293031323334353637383940414243444546
需要注意的一个小点就是如果hbase里有多个过滤器,注意需要使用FilterList。
以上就是关于Spark中的dataFrame如何实现Python中dataFrame的describe功能全部的内容,包括:Spark中的dataFrame如何实现Python中dataFrame的describe功能、intellij idea 怎么编写python程序打包发送到spark、python编程开发关于程序扩展 *** 作等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)