大数据分析师要学:Ja-va、大数据基础、Hadoop体系、Scala、kafka、Spark等内容;数据分析与挖掘:Python、关系型数据库MySQL、文档数据库MongoDB、内存数据库Redis、数据处理、数据分析等。
大数据分析师的工作内容
1. 对数据进行处理
对数据处理的工具有很多,但是基本都绕不开两个核心 EXCEL + SQL。
2. 了解业务
想要辅助决策,首先要了解对方干什么。如何了解业务?通过数据看业务的表现,和需求方沟通,参与需求方的会议,到需求方进行轮岗等。
这些内容可以用流程图+文档记录,帮助自己理解业务流程及细节。
3. 可视化传递信息
需要将信息有效的传递到需求方中,需要使用合理的方式将信息传递。可视化是常见的且有效的方式,这里一般使用EXCEL就可以完成对大多数的需求,但是更建议掌握一个BI工具。
支持mysql的,下面是示例spark streaming使用数据源方式插入mysql数据
import java.sql.{Connection, ResultSet}
import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory
object ConnectionPool {
val logger = LoggerFactory.getLogger(this.getClass)
private val connectionPool = {
try{
Class.forName("com.mysql.jdbc.Driver")
val config = new BoneCPConfig()
config.setJdbcUrl("jdbc:mysql://192.168.0.46:3306/test")
config.setUsername("test")
config.setPassword("test")
config.setMinConnectionsPerPartition(2)
config.setMaxConnectionsPerPartition(5)
config.setPartitionCount(3)
config.setCloseConnectionWatch(true)
config.setLogStatementsEnabled(true)
Some(new BoneCP(config))
} catch {
case exception:Exception=>
logger.warn("Error in creation of connection pool"+exception.printStackTrace())
None
}
}
def getConnection:Option[Connection] ={
connectionPool match {
case Some(connPool) =>Some(connPool.getConnection)
case None =>None
}
}
def closeConnection(connection:Connection): Unit = {
if(!connection.isClosed) connection.close()
}
}
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
/**
* 记录最近五秒钟的数据
*/
object RealtimeCount1{
case class Loging(vtime:Long,muid:String,uid:String,ucp:String,category:String,autoSid:Int,dealerId:String,tuanId:String,newsId:String)
case class Record(vtime:Long,muid:String,uid:String,item:String,types:String)
val logger = LoggerFactory.getLogger(this.getClass)
def main(args: Array[String]) {
val argc = new Array[String](4)
argc(0) = "10.0.0.37"
argc(1) = "test-1"
argc(2) = "test22"
argc(3) = "1"
val Array(zkQuorum, group, topics, numThreads) = argc
val sparkConf = new SparkConf().setAppName("RealtimeCount").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(x=>x._2)
val sql = "insert into loging_realtime1(vtime,muid,uid,item,category) values (?,?,?,?,?)"
val tmpdf = lines.map(_.split("\t")).map(x=>Loging(x(9).toLong,x(1),x(0),x(3),x(25),x(18).toInt,x(29),x(30),x(28))).filter(x=>(x.muid!=null &&!x.muid.equals("null") &&!("").equals(x.muid))).map(x=>Record(x.vtime,x.muid,x.uid,getItem(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId),getType(x.category,x.ucp,x.newsId,x.autoSid.toInt,x.dealerId,x.tuanId)))
tmpdf.filter(x=>x.types!=null).foreachRDD{rdd =>
//rdd.foreach(println)
rdd.foreachPartition(partitionRecords=>{
val connection = ConnectionPool.getConnection.getOrElse(null)
if(connection!=null){
partitionRecords.foreach(record=>process(connection,sql,record))
ConnectionPool.closeConnection(connection)
}
})
}
ssc.start()
ssc.awaitTermination()
}
def getItem(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {
if(category!=null &&!category.equals("null")){
val pattern = ""
val matcher = ucp.matches(pattern)
if(matcher) {
ucp.substring(33,42)
}else{
null
}
}else if(autoSid!=0){
autoSid.toString
}else if(dealerId!=null &&!dealerId.equals("null")){
dealerId
}else if(tuanId!=null &&!tuanId.equals("null")){
tuanId
}else{
null
}
}
def getType(category:String,ucp:String,newsId:String,autoSid:Int,dealerId:String,tuanId:String):String = {
if(category!=null &&!category.equals("null")){
val pattern = "100000726100000730\\d{9}\\d{9}"
val matcher = category.matches(pattern)
val pattern1 =""
val matcher1 = ucp.matches(pattern1)
if(matcher1 &&matcher) {
"nv"
}else if(newsId!=null &&!newsId.equals("null") &&matcher1){
"ns"
}else if(matcher1){
"ne"
}else{
null
}
}else if(autoSid!=0){
"as"
}else if(dealerId!=null &&!dealerId.equals("null")){
"di"
}else if(tuanId!=null &&!tuanId.equals("null")){
"ti"
}else{
null
}
}
def process(conn:Connection,sql:String,data:Record): Unit ={
try{
val ps : PreparedStatement = conn.prepareStatement(sql)
ps.setLong(1,data.vtime)
ps.setString(2,data.muid)
ps.setString(3,data.uid)
ps.setString(4,data.item)
ps.setString(5,data.types)
ps.executeUpdate()
}catch{
case exception:Exception=>
logger.warn("Error in execution of query"+exception.printStackTrace())
}
}
}
【导读】时至今日,相信大家对大数据工程师一点也不陌生,作为时下比较热门的高薪职业,很多人想转行做大数据工程师,那么你知道大数据工程师的日常工作做什么?工作强度大不大呢?为此小编整理了以下内容,一起来看看吧!
1, 写 SQL :一般来说许多入职一两年的大数据工程师首要的工作就是写 SQL
2 ,为集群搭大数据环境(一般公司招大数据工程师环境都现已搭好了,公司内部会有现成的大数据途径)
3 ,维护大数据途径(这个应该是每个大数据工程师都做过的工作,或多或少会承担“运维”的工作)
4, 数据搬家(有部分公司需求把数据从传统的数据库 Oracle、MySQL 等数据搬家到大数据集群中,这个是比较繁琐的工作)
5 ,运用搬家(有部分公司需求把运用从传统的数据库 Oracle、MySQL
等数据库的存储进程程序或许SQL脚本搬家到大数据途径上,这个进程也是非常繁琐的工作,高度重复且杂乱)
6 ,数据收集(收集日志数据、文件数据、接口数据,这个触及到各种格式的转化,一般用得比较多的是 Flume 和 Logstash)
7, 数据处理
7.1 ,离线数据处理(这个一般就是写写 SQL 然后扔到 Hive 中跑,其实和首要点有点重复了)
7.2 ,实时数据处理(这个触及到音讯部队,Kafka,Spark,Flink 这些,组件,一般就是 Flume 收集到数据发给 Kafka 然后
Spark 消费 Kafka 的数据进行处理)
8 ,数据可视化(这个我司是用 Spring Boot 联接后台数据与前端,前端用自己魔改的 echarts)
9 ,大数据途径开发(偏Java方向的,大约就是把开源的组件整合起来整成一个可用的大数据途径这样,常见的是各种难用的 PaaS 途径)
10
,数据中台开发(中台需求支撑接入各种数据源,把各种数据源清洗转化为可用的数据,然后再根据原始数据建立起宽表层,一般为了节省开发本钱和服务器资源,都是根据宽表层查询出业务数据)
11 ,建立数据仓库(这儿的数据仓库的建立不是指 Hive ,Hive 是建立数仓的东西,数仓建立一般会分为三层 ODS、DW、DM
层,其间DW是最重要的,它又能够分为DWD,DWM,DWS,这个层级仅仅逻辑上的概念,类似于把表名按照层级差异隔来的 *** 作,分层的目的是防止开发数据运用的时分直接访问底层数据,能够减少资源,留意,减少资源开支是减少
内存 和 CPU
的开支,分层后磁盘占用会大大增加,磁盘不值钱所以没什么联络,分层能够使数据表的逻辑更加清楚,便当进一步的开发 *** 作,假定分层没有做好会导致逻辑紊乱,新来的员工难以接手业务,跋涉公司的运营本钱,还有这个建数仓也分为建离线和实时的)
以上就是小编今天给大家整理发送的关于“大数据工程师的日常工作做什么?”的相关内容,希望对大家有所帮助。想了解更多关于大数据工程师要求具备的能力,关注小编持续更新。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)