spark怎样并发的从mysql查询数据

spark怎样并发的从mysql查询数据,第1张

在已有的 MySQL 服务器之上使用 Apache Spark (无需将数据导出到 Spark 或者 Hadoop 平台上),这样至少可以提升 10 倍的查询性能。使用多个 MySQL 服务器(复制或者 Percona XtraDB Cluster)可以让我们在某些查询上得到额外的性能提升。你也可以使用 Spark 的缓存功能来缓存整个 MySQL 查询结果表。

思路很简单:Spark 可以通过 JDBC 读取 MySQL 上的数据,也可以执行 SQL 查询,因此我们可以直接连接到 MySQL 并执行查询。那么为什么速度会快呢?对一些需要运行很长时间的查询(如报表或者BI),由于 Spark 是一个大规模并行系统,因此查询会非常的快。MySQL 只能为每一个查询分配一个 CPU 核来处理,而 Spark 可以使用所有集群节点的所有核。在下面的例子中,我们会在 Spark 中执行 MySQL 查询,这个查询速度比直接在 MySQL 上执行速度要快 5 到 10 倍。

另外,Spark 可以增加“集群”级别的并行机制,在使用 MySQL 复制或者 Percona XtraDB Cluster 的情况下,Spark 可以把查询变成一组更小的查询(有点像使用了分区表时可以在每个分区都执行一个查询),然后在多个 Percona XtraDB Cluster 节点的多个从服务器上并行的执行这些小查询。最后它会使用map/reduce 方式将每个节点返回的结果聚合在一起形成完整的结果。

支持mysql的,下面是示例

spark streaming使用数据源方式插入mysql数据

import java.sql.{Connection, ResultSet}

import com.jolbox.bonecp.{BoneCP, BoneCPConfig}

import org.slf4j.LoggerFactory

object ConnectionPool {

val logger = LoggerFactory.getLogger(this.getClass)

private val connectionPool = {

try{

Class.forName("com.mysql.jdbc.Driver")

val config = new BoneCPConfig()

config.setJdbcUrl("jdbc:mysql://192.168.0.46:3306/test")

config.setUsername("test")

config.setPassword("test")

config.setMinConnectionsPerPartition(2)

config.setMaxConnectionsPerPartition(5)

config.setPartitionCount(3)

config.setCloseConnectionWatch(true)

config.setLogStatementsEnabled(true)

Some(new BoneCP(config))

} catch {

case exception:Exception=>

logger.warn("Error in creation of connection pool"+exception.printStackTrace())

None

}

}

def getConnection:Option[Connection] ={

connectionPool match {

case Some(connPool) =>Some(connPool.getConnection)

case None =>None

}

}

def closeConnection(connection:Connection): Unit = {

if(!connection.isClosed) connection.close()

}

}

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.streaming.kafka.KafkaUtils

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.{SparkConf, SparkContext}

import org.slf4j.LoggerFactory

/**

* 记录最近五秒钟的数据

*/

object RealtimeCount1{

case class Loging(vtime:Long,muid:String,uid:String,ucp:String,category:String,autoSid:Int,dealerId:String,tuanId:String,newsId:String)

case class Record(vtime:Long,muid:String,uid:String,item:String,types:String)

val logger = LoggerFactory.getLogger(this.getClass)

def main(args: Array[String]) {

val argc = new Array[String](4)

argc(0) = "10.0.0.37"

argc(1) = "test-1"

argc(2) = "test22"

argc(3) = "1"

val Array(zkQuorum, group, topics, numThreads) = argc

val sparkConf = new SparkConf().setAppName("RealtimeCount").setMaster("local[2]")

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(5))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(x=>x._2)

val sql = "insert into loging_realtime1(vtime,muid,uid,item,category) values (?,?,?,?,?)"

val tmpdf = lines.map(_.split("\t")).map(x=>Loging(x(9).toLong,x(1),x(0),x(3),x(25),x(18).toInt,x(29),x(30),x(28))).filter(x=>(x.muid!=null &&!x.muid.equals("null") &&!("").equals(x.muid))).map(x=>Record(x.vtime,x.muid,x.uid,getItem(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId),getType(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId)))

tmpdf.filter(x=>x.types!=null).foreachRDD{rdd =>

//rdd.foreach(println)

rdd.foreachPartition(partitionRecords=>{

val connection = ConnectionPool.getConnection.getOrElse(null)

if(connection!=null){

partitionRecords.foreach(record=>process(connection,sql,record))

ConnectionPool.closeConnection(connection)

}

})

}

ssc.start()

ssc.awaitTermination()

}

def getItem(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {

if(category!=null &&!category.equals("null")){

val pattern = ""

val matcher = ucp.matches(pattern)

if(matcher) {

ucp.substring(33,42)

}else{

null

}

}else if(autoSid!=0){

autoSid.toString

}else if(dealerId!=null &&!dealerId.equals("null")){

dealerId

}else if(tuanId!=null &&!tuanId.equals("null")){

tuanId

}else{

null

}

}

def getType(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {

if(category!=null &&!category.equals("null")){

val pattern = "100000726100000730\\d{9}\\d{9}"

val matcher = category.matches(pattern)

val pattern1 =""

val matcher1 = ucp.matches(pattern1)

if(matcher1 &&matcher) {

"nv"

}else if(newsId!=null &&!newsId.equals("null") &&matcher1){

"ns"

}else if(matcher1){

"ne"

}else{

null

}

}else if(autoSid!=0){

"as"

}else if(dealerId!=null &&!dealerId.equals("null")){

"di"

}else if(tuanId!=null &&!tuanId.equals("null")){

"ti"

}else{

null

}

}

def process(conn:Connection,sql:String,data:Record): Unit ={

try{

val ps : PreparedStatement = conn.prepareStatement(sql)

ps.setLong(1,data.vtime)

ps.setString(2,data.muid)

ps.setString(3,data.uid)

ps.setString(4,data.item)

ps.setString(5,data.types)

ps.executeUpdate()

}catch{

case exception:Exception=>

logger.warn("Error in execution of query"+exception.printStackTrace())

}

}

}

我知道的有两种:

一种:

单独写个jdbc.properties,在里面配置

jdbc.driverClassName=com.mysql.jdbc.Driver

jdbc.url=jdbc:mysql://localhost:3306/school

jdbc.username=root

jdbc.password=root

然后,在applicationContext中设置你的jdbc.properties路径:

<bean id="propertyConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

<property name="locations">

<list>

<value>classpath:jdbc.properties</value>

</list>

</property>

</bean>

在dataSource bean中把你配置中的参数引用:

<bean id="dataSource"

class="org.springframework.jdbc.datasource.DriverManagerDataSource">

<property name="driverClassName">

<value>${jdbc.driverClassName}</value>

</property>

<property name="url">

<value>${jdbc.url}</value>

</property>

<property name="username">

<value>${jdbc.username}</value>

</property>

<property name="password">

<value>${jdbc.password}</value>

</property>

</bean>

二种,这种比较简单点儿,就直接在datasource bean中把jdbc.properties中的值在里面对应的地方配置就可以了。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/6132116.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-15
下一篇 2023-03-15

发表评论

登录后才能评论

评论列表(0条)

保存