Spark+Kafka构建Dashboard实训+踩坑笔记

Spark+Kafka构建Dashboard实训+踩坑笔记,第1张

Spark+Kafka构建Dashboard实训+踩坑笔记 项目简介

项目通过spark和kafka构建实时分析平台,设计消息预处理、消息队列发送、接收消息、数据实时处理、数据实时推送和实时展示等数据处理全流程

涉及技术

linux: *** 作系统
Spark:专为大规模数据处理而设计的快速通用搜索引擎,由scala语言编写 Kafka:是一种高吞吐量的分布式订阅消息分发系统,由scala和Java编写,处理大量的用户访问流记录
Flask.socketIO:是一个消息双工即使模块
Flask:python编写的轻量级web框架,主要包括两个核心库:werkzeug和jinja2,功能分别是处理业务和负责安全
Highchart.js:是一个由javascript编写的图标库
Pycharm:python编译器

实现思路

部署hadoop–>安装spark–>编写scala独立应用程序–>安装kafka–>安装python–>python *** 作kafka数据预处理–>编写sparksteaming程序和producer和consumer程序处理数据

项目体系

<项目实现> 1、部署spark 1.1、部署hadoop

//创建用户并设置权限

~$: sudo useradd -m hadoop -s /bin/bash
~$: sudo passwd hadoop
~$: sudo adduser hadoop sudo     //授权

//更新apt

~$: sudo apt-get updata

//配置SSH免密登录
ubuntu默认有client,这里还需要安装ssh server

~$: sudo apt-get install openssh-server
~$: ssh localhost    //看完后记得exit一下
~$: cd ~/.ssh/
~$: ssh-keygen -t rsa  //-t指秘钥类型 rsa指SSH2,,提示时,直接回车
~$: cat ./id_rsa.pub >> ./authroized_keys

1.2、部署java环境

官网链接: https://www.oracle.com/java/technologies/downloads/

//下载好对应的压缩包后解压到相应目录并设置环境变量

~$: sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm
~$: cd ~            //cd /进入跟目录    cd ~进入用户家目录
~$: cat ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
~$: source ~/.bashrc    //环境变量生效
~$: java -version

1.3、安装hadoop2

hadoop2下载地址: http://mirrors.cnnic.cn/apache/hadoop/common/
//有的同学下载包时会遇到两个都感觉可以下载的情况,一脸懵逼不知下载哪个????这里解释一下,binary是编译好的可以直接使用,source是源码,需要编译之后使用。
chown 修改文件和文件夹的用户和用户组属性
chmod 修改文件和文件夹读写执行属性

~$: sudo tar -zxvf /hadoop-2.6.0.tar.gz -C /usr/local  //解压到local
~$: cd /usr/local
~$: sudo mv ./hadoop-2.6.0/ ./hadoop   //为了方便后期使用,改个短一点的命
~$: sudo chown -R hadoop ./hadoop  //chown 修改文件和文件夹的用户和用户组属性  chmod 修改文件和文件夹读写执行属性
~$: cd /usr/local/hadoop/bin   //bin目录下存放二进制可执行文件
~$: ./hadoop version
1.4、hadoop伪分布式

//修改配置文件,文件位于hadoop包的/etc/hadoop下
(注意:configuration 标签原文已有,不可重复)
core-site.xml


    
        hadoop.tmp.dir
        file:/usr/local/hadoop/tmp
        Abase for other temporary directories.
    
    
        fs.defaultFS
        hdfs://localhost:9000
    

hdfs-site.xml


    
        dfs.replication
        1
    
    
        dfs.namenode.name.dir
        file:/usr/local/hadoop/tmp/dfs/name
    
    
        dfs.datanode.data.dir
        file:/usr/local/hadoop/tmp/dfs/data
    

//到hadoop的bin 目录下执行namenode格式化

~$: ./hdfs namenode -format


格式化过程中出现y/n询问,输y回车即可

// 开启namenode和datanode。
NameNode:是Master节点,有点类似Linux里的根目录。管理数据块映射;处理客户端的读写请求;配置副本策略;管理HDFS的名称空间;
DataNode:负责存储client发来的数据块block;执行数据块的读写 *** 作。是NameNode的小弟。

~$: cd /usr/local/hadoop
~$: ./sbin/start-dfs.sh
~$: ./sbin/stop-dfs.sh

下面图片有很多进程,是我做实验开着后没关的,启动start-dfs.sh只要有
jps + secondarynamenode + namenode + datanode即可

访问浏览器页面出现下图即可
(拓展:localhost:9000后面跟着一个词active,这括号后面有两种状态,一种是active状态,一种是standby状态,active状态意味着提供服务,standby状态意味着处于休眠状态,只进行数据同步,时刻准备着提供服务,两者可以切换。)

至此,我们的铺垫,有就是hadoop就部署完成了,接下来就开始部署spark
+++++++++++++++++++++++++++++++++++++++++++++++++++++++

2.部署spark 2.1、安装与修改配置

spark官网: http://spark.apache.org/downloads.html
choose a package type选择如图所示,这个是属于hadoop free版本,下载到的saprk可以用到任何一个hadoop版本
spark四种安装模式:
1.local模式(单机模式)(本次使用的模式)
2.standlone模式(使用spark自带的集群管理器)
3.yarn模式(使用yarn作为集群管理器)
4.mesos模式(使用mesos作为集群管理器)

// 接着走一遍与安装hadoop2一样的步骤
(解压到指定目录 + cd 到解压目录 + 改名 + 授权)

~$: sudo tar -zxvf spark-2.1.0-bin-without-hadoop.tgz -C /usr/local/
~$: cd /usr/local
~$: sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark
~$: sudo chown -R hadoop:hadoop ./spark 

// 修改spark配置文件spark-env.sh.template
配置作用:配置后Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据

~$: cd /usr/local/spark
~$: cat ./conf/spark-env.sh
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

// 验证spark是否安装成功----/spark目录下

~$: bin/run-example SparkPi 2>&1 | grep "Pi is"

2.2、运行spark程序

需要退出时输入quit即可

~$: bin/spark-shell


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

3、scala独立应用程序编写 3.1、安装sbt

首先先安装sbt,sbt是scala的编译工具
sbt下载链接: http://www/scala-sbt.org

~$: sudo mkdir /usr/local/sbt    //创建安装目录
~$: sudo tar -zxvf sbt-1.3.8.tgz -C /usr/local
~$: cd /usr/local/sbt
~$: sudo chown -R hadoop /usr/local/sbt
~$: cp ./bin/sbt-launch.jar ./    //将bin下的launch.jar复制到当下目录
~$: vi /usr/local/sbt/sbt     //创建shell脚本用于启动sbt
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname 
~$: chmod u+x /usr/lcoal/sbt/sbt     //u指所有者,+x值赋予可执行权限
`/sbt-launch.jar "$@"

// 为shell脚本授予权限

~$: vi ./sparkapp/simple.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
3.2、编写并打包scala应用程序
~$: cd ~
~$: mkdir ./sparkapp   //创建应用程序根目录
~$: mkdir -p ./sparkapp/src/main/scala   //-p值递归创建,也就是同时创建层层嵌套文件夹
~$: cd ./sparkapp/src/main/scala
~$: vi SimpleApp.scala
 
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
 
    object SimpleApp {
        def main(args: Array[String]) {
            val logFile = "file:///usr/local/spark/README.md" // Should be some file on your system
            val conf = new SparkConf().setAppName("Simple Application")
            val sc = new SparkContext(conf)
            val logData = sc.textFile(logFile, 2).cache()
            val numAs = logData.filter(line => line.contains("a")).count()
            val numBs = logData.filter(line => line.contains("b")).count()
            println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
        }
    }

// 打包scala程序
注意:每次编写脚本时都要注意版本差异

~$: /usr/local/sbt/sbt package

// 首次运行需要下载依赖包

~$: sudo tar -zxvf kafka_2.11-0.10.1.0.tgz -C /usr/local
~$: cd /usr/local
~$: sudo mv afka_2.11-0.10.1.0/ ./kafka
~$: sudo chown -R hadoop ./kafka
~$: cd /usr/lcoal/kafka
~$: bin/zookeeper-server-start.sh config/zookeeper.properties //运行实例


// 通过submit运行程序

~$: /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar 2>&1 | grep "Lines with a:"

4、部署kafka 4.1、安装kafka

kafka官网: https://kafka.apache.org/downloads

核心概念

  1. Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
  2. Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  3. Partition
    Partition是物理上的概念,每个Topic包含一个或多个Partition.
  4. Producer
    负责发布消息到Kafka broker
  5. Consumer
    消息消费者,向Kafka broker读取消息的客户端。
  6. Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
    // 解压安装包
~$: cd /usr/lcoal/kafka
~$: bin/kafka-server-start.sh config/server.properties
4.2、测试


注意:启动完后会保持上图状态,一定不要关闭他,启动新的终端(终端2)

~$: cd /usr/local/kafka
~$: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab


// 上图已经完成kafka服务端的启动了,不要关闭他,启动终端3输入命令如下

~$: bin/kafka-topics.sh --list --zookeeper localhost:2181
~$: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

用list列出所有创建的topics,来查看刚才创建的主题是否存在。

~$: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning 

CTRL+C退出,然后使用consumer来接收数据,输入如下命令:

~$: sudo apt-get install python3-pip
~$: pip3 install flask    //安装需要的库flask和socketio
~$: pip3 install flask-socketio
~$: pip3 install kafka-python


至此,kafka就安装成功了!
+++++++++++++++++++++++++++++++++++++++++++++++++++++++

4、安装python 4.1、python下载

案例用到的python第三方库,flask和socketio,pip管理第三方库
注意自己下载到哪个路径下

~$: cd 
~$: sudo tar -zxvf pycharm-community-2021.2.3.tar.gz -C /usr/local  
~$: cd /usr/local
~$: sudo mv pycharm-community-2021.2.3 pycharm 
~$: sudo chown -R hadoop ./pycharm 
~$: cd /usr/local/pycharm
~$: ./bin/pycharm.sh    //启动pycharm

pycharm的安装
pycharm官网: https://www.jetbrains.com/pycharm/download/#section=linux
接下来就是万年不变的解压安装步骤

~$: sudo apt-get install zlib1g-dev libbz2-dev libssl-dev libncurses5-dev libsqlite3-dev libreadline-dev tk-dev libgdbm-dev libdb-dev libpcap-dev xz-utils libexpat1-dev liblzma-dev libffi-dev libc6-dev
~$: wget https://www.python.org/ftp/python/3.7.5/Python-3.7.5.tgz
~$: tar -xzvf Python-3.7.5.tgz
~$: cd Python3.7.5
~$: sudo mkdir -p /usr/local/python3
~$: ./configure --prefix=/usr/local/python3  --enable-optimizations
~$: make
~$: sudo make install
~$: ln -s /usr/local/python3/bin/python3.7 /usr/bin/python
~$: ln -s /usr/local/python3/bin/pip3.7 /usr/bin/pip
~$: python -V
4.2、更换python版本(可选)

ubuntu16.04自带有python2.7和python3.5,直接使用python3.5即可
若需要更换python 版本,例如3.7,则 *** 作如下:

# coding: utf-8
import csv
import time
from kafka import KafkaProducer
 
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
 
for line in reader:
    gender = line[9] # 性别在每行日志代码的第9个元素
    if gender == 'gender':
        continue # 去除第一行表头
    time.sleep(0.1) # 每隔0.1秒发送一行数据
    # 发送数据,topic为'sex'
    producer.send('sex',line[9].encode('utf8'))
4.3、pycharm目录结构:

目录介绍
data目录存放的是用户日志数据;
scripts目录存放的是Kafka生产者和消费者;
static/js目录存放的是前端所需要的js框架;
templates目录存放的是html页面;
app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;
External Libraries是本项目所依赖的Python库,是PyCharm自动生成。

至此,开发环境就部署完成了
++++++++++++++++++++++++++++++++++++++++++++++++++++++++

5、python *** 作kafka

数据集: https://pan.baidu.com/s/1cs02Nc
该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv

5.1、创建工程文件

pycharm中按照上面工程项目图定位新建一个文件名为producer.py

from kafka import KafkaConsumer
 
consumer = KafkaConsumer('sex')
for msg in consumer:
    print((msg.value).decode('utf8'))

pycharm中按照上面工程项目图定位新建一个文件名为consumer.py

~$: cd /usr/local/kafka
~$: bin/zookeeper-server-start.sh config/zookeeper.properties &
~$: bin/kafka-server-start.sh config/server.properties

运行上面两个文件前先开启kafka

~$: sudo mv ~/下载/spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars
~$: cd /usr/local/spark/jars
~$: mkdir kafka
~$: cd kafka
~$: cp /usr/local/kafka/libs
object StreamingExamples extends Logging {
  
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
5.2、测试

kafka开启后直接在pycharm中右键运行producer.py和consumer.py,运行出来是数据流则正确

6、配置spark开发kafka环境 6.1、下载spark连接kafka的代码库

将下载到的代码库放到/usr/local/spark/jars目录下

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"
package org.apache.spark.examples.streaming
 
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.Interval
import org.apache.spark.streaming.kafka._
 
object KafkaWordCount {
  implicit val formats = DefaultFormats//数据格式化时需要
  def main(args: Array[String]): Unit={
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount    ")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()
    
    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")  //这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop
    // 将topics转换成topic-->numThreads的哈稀表
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    // 创建连接Kafka的消费者链接
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))//将输入的每行用空格分割成一个个word
    // 对每一秒的输入数据进行reduce,然后将reduce后的数据发送给Kafka
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => {
          if(rdd.count !=0 ){
               val props = new HashMap[String, Object]()
               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092")
               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
               "org.apache.kafka.common.serialization.StringSerializer")
               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
               "org.apache.kafka.common.serialization.StringSerializer")
               // 实例化一个Kafka生产者
               val producer = new KafkaProducer[String, String](props)
               // rdd.colect即将rdd中数据转化为数组,然后write函数将rdd内容转化为json格式
               val str = write(rdd.collect)
               // 封装成Kafka消息,topic为"result"
               val message = new ProducerRecord[String, String]("result", null, str)
               // 给Kafka发送消息
               producer.send(message)
          }
      })
    ssc.start()
    ssc.awaitTermination()
  }
}

注意:如果没有启动hdfs就运行的话,会报错“拒绝连接”,所以运行前先启动hdfs,命令:cd /usr/local/hadoop
./sbin/start-dfs.sh(前面有一点)

6.3、打包以及编译

// 在/usr/local/spark/mycode/kafka目录下新建simple.sbt打包,代码如下

 /usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:2181 1 sex 1

// 编译打包程序:/usr/local/sbt/sbt packag
接着在在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,代码如下

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
#因为第一步骤安装好了flask,所以这里可以引用
 
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer('result')
 
# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
    girl = 0
    boy = 0
    for msg in consumer:
        data_json = msg.value.decode('utf8')
        data_list = json.loads(data_json)
        for data in data_list:
            if '0' in data.keys():
                girl = data['0']
            elif '1' in data.keys():
                boy = data['1']
            else:
                continue
        result = str(girl) + ',' + str(boy)
        print(result)
        socketio.emit('test_message',{'data':result})
 
# 客户端发送connect事件时的处理函数
@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        # 单独开启一个线程给客户端发送数据
        thread = socketio.start_background_task(target=background_thread)
    socketio.emit('connected', {'data': 'Connected'})
 
# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
    return render_template("index.html")
 
# main函数
if __name__ == '__main__':
    socketio.run(app,debug=True)
6.4、测试

运行:sh startup.sh

Consumer中接收的topic改为result
运行producer.py和consumer.py

7、步骤展示(可视化技术)

利用Flask-SocketIO实时推送数据
socket.io.js实时获取数据
highlights.js展示数据

7.1、创建工程文件

创建app.py文件

DashBoard

// 创建index页面




    
    
    
    
    
    
    Girl: 



    Boy: 
    					
										


					

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5610667.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存