涉及技术项目通过spark和kafka构建实时分析平台,设计消息预处理、消息队列发送、接收消息、数据实时处理、数据实时推送和实时展示等数据处理全流程
实现思路linux: *** 作系统
Spark:专为大规模数据处理而设计的快速通用搜索引擎,由scala语言编写 Kafka:是一种高吞吐量的分布式订阅消息分发系统,由scala和Java编写,处理大量的用户访问流记录
Flask.socketIO:是一个消息双工即使模块
Flask:python编写的轻量级web框架,主要包括两个核心库:werkzeug和jinja2,功能分别是处理业务和负责安全
Highchart.js:是一个由javascript编写的图标库
Pycharm:python编译器
项目体系 <项目实现> 1、部署spark 1.1、部署hadoop部署hadoop–>安装spark–>编写scala独立应用程序–>安装kafka–>安装python–>python *** 作kafka数据预处理–>编写sparksteaming程序和producer和consumer程序处理数据
//创建用户并设置权限
~$: sudo useradd -m hadoop -s /bin/bash ~$: sudo passwd hadoop ~$: sudo adduser hadoop sudo //授权
//更新apt
~$: sudo apt-get updata
//配置SSH免密登录
ubuntu默认有client,这里还需要安装ssh server
~$: sudo apt-get install openssh-server ~$: ssh localhost //看完后记得exit一下 ~$: cd ~/.ssh/ ~$: ssh-keygen -t rsa //-t指秘钥类型 rsa指SSH2,,提示时,直接回车 ~$: cat ./id_rsa.pub >> ./authroized_keys1.2、部署java环境
官网链接: https://www.oracle.com/java/technologies/downloads/
//下载好对应的压缩包后解压到相应目录并设置环境变量
~$: sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm ~$: cd ~ //cd /进入跟目录 cd ~进入用户家目录 ~$: cat ~/.bashrc export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH ~$: source ~/.bashrc //环境变量生效 ~$: java -version1.3、安装hadoop2
hadoop2下载地址: http://mirrors.cnnic.cn/apache/hadoop/common/
//有的同学下载包时会遇到两个都感觉可以下载的情况,一脸懵逼不知下载哪个????这里解释一下,binary是编译好的可以直接使用,source是源码,需要编译之后使用。
chown 修改文件和文件夹的用户和用户组属性
chmod 修改文件和文件夹读写执行属性
~$: sudo tar -zxvf /hadoop-2.6.0.tar.gz -C /usr/local //解压到local ~$: cd /usr/local ~$: sudo mv ./hadoop-2.6.0/ ./hadoop //为了方便后期使用,改个短一点的命 ~$: sudo chown -R hadoop ./hadoop //chown 修改文件和文件夹的用户和用户组属性 chmod 修改文件和文件夹读写执行属性 ~$: cd /usr/local/hadoop/bin //bin目录下存放二进制可执行文件 ~$: ./hadoop version1.4、hadoop伪分布式
//修改配置文件,文件位于hadoop包的/etc/hadoop下
(注意:configuration 标签原文已有,不可重复)
core-site.xml
hadoop.tmp.dir file:/usr/local/hadoop/tmp Abase for other temporary directories. fs.defaultFS hdfs://localhost:9000
hdfs-site.xml
dfs.replication 1 dfs.namenode.name.dir file:/usr/local/hadoop/tmp/dfs/name dfs.datanode.data.dir file:/usr/local/hadoop/tmp/dfs/data
//到hadoop的bin 目录下执行namenode格式化
~$: ./hdfs namenode -format
格式化过程中出现y/n询问,输y回车即可
// 开启namenode和datanode。
NameNode:是Master节点,有点类似Linux里的根目录。管理数据块映射;处理客户端的读写请求;配置副本策略;管理HDFS的名称空间;
DataNode:负责存储client发来的数据块block;执行数据块的读写 *** 作。是NameNode的小弟。
~$: cd /usr/local/hadoop ~$: ./sbin/start-dfs.sh ~$: ./sbin/stop-dfs.sh
下面图片有很多进程,是我做实验开着后没关的,启动start-dfs.sh只要有
jps + secondarynamenode + namenode + datanode即可
访问浏览器页面出现下图即可
(拓展:localhost:9000后面跟着一个词active,这括号后面有两种状态,一种是active状态,一种是standby状态,active状态意味着提供服务,standby状态意味着处于休眠状态,只进行数据同步,时刻准备着提供服务,两者可以切换。)
至此,我们的铺垫,有就是hadoop就部署完成了,接下来就开始部署spark
+++++++++++++++++++++++++++++++++++++++++++++++++++++++
spark官网: http://spark.apache.org/downloads.html
choose a package type选择如图所示,这个是属于hadoop free版本,下载到的saprk可以用到任何一个hadoop版本
spark四种安装模式:
1.local模式(单机模式)(本次使用的模式)
2.standlone模式(使用spark自带的集群管理器)
3.yarn模式(使用yarn作为集群管理器)
4.mesos模式(使用mesos作为集群管理器)
// 接着走一遍与安装hadoop2一样的步骤
(解压到指定目录 + cd 到解压目录 + 改名 + 授权)
~$: sudo tar -zxvf spark-2.1.0-bin-without-hadoop.tgz -C /usr/local/ ~$: cd /usr/local ~$: sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark ~$: sudo chown -R hadoop:hadoop ./spark
// 修改spark配置文件spark-env.sh.template
配置作用:配置后Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据
~$: cd /usr/local/spark ~$: cat ./conf/spark-env.sh export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
// 验证spark是否安装成功----/spark目录下
~$: bin/run-example SparkPi 2>&1 | grep "Pi is"2.2、运行spark程序
需要退出时输入quit即可
~$: bin/spark-shell
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
首先先安装sbt,sbt是scala的编译工具
sbt下载链接: http://www/scala-sbt.org
~$: sudo mkdir /usr/local/sbt //创建安装目录 ~$: sudo tar -zxvf sbt-1.3.8.tgz -C /usr/local ~$: cd /usr/local/sbt ~$: sudo chown -R hadoop /usr/local/sbt ~$: cp ./bin/sbt-launch.jar ./ //将bin下的launch.jar复制到当下目录 ~$: vi /usr/local/sbt/sbt //创建shell脚本用于启动sbt #!/bin/bash SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M" java $SBT_OPTS -jar `dirname~$: chmod u+x /usr/lcoal/sbt/sbt //u指所有者,+x值赋予可执行权限`/sbt-launch.jar "$@"
// 为shell脚本授予权限
~$: vi ./sparkapp/simple.sbt name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"3.2、编写并打包scala应用程序
~$: cd ~ ~$: mkdir ./sparkapp //创建应用程序根目录 ~$: mkdir -p ./sparkapp/src/main/scala //-p值递归创建,也就是同时创建层层嵌套文件夹 ~$: cd ./sparkapp/src/main/scala ~$: vi SimpleApp.scala import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "file:///usr/local/spark/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
// 打包scala程序
注意:每次编写脚本时都要注意版本差异
~$: /usr/local/sbt/sbt package
// 首次运行需要下载依赖包
~$: sudo tar -zxvf kafka_2.11-0.10.1.0.tgz -C /usr/local ~$: cd /usr/local ~$: sudo mv afka_2.11-0.10.1.0/ ./kafka ~$: sudo chown -R hadoop ./kafka ~$: cd /usr/lcoal/kafka ~$: bin/zookeeper-server-start.sh config/zookeeper.properties //运行实例
// 通过submit运行程序
~$: /usr/local/spark/bin/spark-submit --class "SimpleApp" ~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar 2>&1 | grep "Lines with a:"4、部署kafka 4.1、安装kafka
kafka官网: https://kafka.apache.org/downloads
核心概念
- Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker - Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处) - Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition. - Producer
负责发布消息到Kafka broker - Consumer
消息消费者,向Kafka broker读取消息的客户端。 - Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
// 解压安装包
~$: cd /usr/lcoal/kafka ~$: bin/kafka-server-start.sh config/server.properties4.2、测试
注意:启动完后会保持上图状态,一定不要关闭他,启动新的终端(终端2)
~$: cd /usr/local/kafka ~$: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
// 上图已经完成kafka服务端的启动了,不要关闭他,启动终端3输入命令如下
~$: bin/kafka-topics.sh --list --zookeeper localhost:2181 ~$: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
用list列出所有创建的topics,来查看刚才创建的主题是否存在。
~$: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
CTRL+C退出,然后使用consumer来接收数据,输入如下命令:
~$: sudo apt-get install python3-pip ~$: pip3 install flask //安装需要的库flask和socketio ~$: pip3 install flask-socketio ~$: pip3 install kafka-python
至此,kafka就安装成功了!
+++++++++++++++++++++++++++++++++++++++++++++++++++++++
案例用到的python第三方库,flask和socketio,pip管理第三方库
注意自己下载到哪个路径下
~$: cd ~$: sudo tar -zxvf pycharm-community-2021.2.3.tar.gz -C /usr/local ~$: cd /usr/local ~$: sudo mv pycharm-community-2021.2.3 pycharm ~$: sudo chown -R hadoop ./pycharm ~$: cd /usr/local/pycharm ~$: ./bin/pycharm.sh //启动pycharm
pycharm的安装
pycharm官网: https://www.jetbrains.com/pycharm/download/#section=linux
接下来就是万年不变的解压安装步骤
~$: sudo apt-get install zlib1g-dev libbz2-dev libssl-dev libncurses5-dev libsqlite3-dev libreadline-dev tk-dev libgdbm-dev libdb-dev libpcap-dev xz-utils libexpat1-dev liblzma-dev libffi-dev libc6-dev ~$: wget https://www.python.org/ftp/python/3.7.5/Python-3.7.5.tgz ~$: tar -xzvf Python-3.7.5.tgz ~$: cd Python3.7.5 ~$: sudo mkdir -p /usr/local/python3 ~$: ./configure --prefix=/usr/local/python3 --enable-optimizations ~$: make ~$: sudo make install ~$: ln -s /usr/local/python3/bin/python3.7 /usr/bin/python ~$: ln -s /usr/local/python3/bin/pip3.7 /usr/bin/pip ~$: python -V4.2、更换python版本(可选)
ubuntu16.04自带有python2.7和python3.5,直接使用python3.5即可
若需要更换python 版本,例如3.7,则 *** 作如下:
# coding: utf-8 import csv import time from kafka import KafkaProducer # 实例化一个KafkaProducer示例,用于向Kafka投递消息 producer = KafkaProducer(bootstrap_servers='localhost:9092') # 打开数据文件 csvfile = open("../data/user_log.csv","r") # 生成一个可用于读取csv文件的reader reader = csv.reader(csvfile) for line in reader: gender = line[9] # 性别在每行日志代码的第9个元素 if gender == 'gender': continue # 去除第一行表头 time.sleep(0.1) # 每隔0.1秒发送一行数据 # 发送数据,topic为'sex' producer.send('sex',line[9].encode('utf8'))4.3、pycharm目录结构:
目录介绍
data目录存放的是用户日志数据;
scripts目录存放的是Kafka生产者和消费者;
static/js目录存放的是前端所需要的js框架;
templates目录存放的是html页面;
app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;
External Libraries是本项目所依赖的Python库,是PyCharm自动生成。
至此,开发环境就部署完成了
++++++++++++++++++++++++++++++++++++++++++++++++++++++++
数据集: https://pan.baidu.com/s/1cs02Nc
该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv
pycharm中按照上面工程项目图定位新建一个文件名为producer.py
from kafka import KafkaConsumer consumer = KafkaConsumer('sex') for msg in consumer: print((msg.value).decode('utf8'))
pycharm中按照上面工程项目图定位新建一个文件名为consumer.py
~$: cd /usr/local/kafka ~$: bin/zookeeper-server-start.sh config/zookeeper.properties & ~$: bin/kafka-server-start.sh config/server.properties
运行上面两个文件前先开启kafka
~$: sudo mv ~/下载/spark-streaming-kafka-0-8_2.11-2.1.0.jar /usr/local/spark/jars ~$: cd /usr/local/spark/jars ~$: mkdir kafka ~$: cd kafka ~$: cp /usr/local/kafka/libs object StreamingExamples extends Logging { def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { // We first log something to initialize Spark's default logging, then we override the // logging level. logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN) } } }5.2、测试
kafka开启后直接在pycharm中右键运行producer.py和consumer.py,运行出来是数据流则正确
将下载到的代码库放到/usr/local/spark/jars目录下
name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0" libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11"
package org.apache.spark.examples.streaming import java.util.HashMap import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.json4s._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.Interval import org.apache.spark.streaming.kafka._ object KafkaWordCount { implicit val formats = DefaultFormats//数据格式化时需要 def main(args: Array[String]): Unit={ if (args.length < 4) { System.err.println("Usage: KafkaWordCount") System.exit(1) } StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") //这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop // 将topics转换成topic-->numThreads的哈稀表 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // 创建连接Kafka的消费者链接 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" "))//将输入的每行用空格分割成一个个word // 对每一秒的输入数据进行reduce,然后将reduce后的数据发送给Kafka val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => { if(rdd.count !=0 ){ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // 实例化一个Kafka生产者 val producer = new KafkaProducer[String, String](props) // rdd.colect即将rdd中数据转化为数组,然后write函数将rdd内容转化为json格式 val str = write(rdd.collect) // 封装成Kafka消息,topic为"result" val message = new ProducerRecord[String, String]("result", null, str) // 给Kafka发送消息 producer.send(message) } }) ssc.start() ssc.awaitTermination() } }
注意:如果没有启动hdfs就运行的话,会报错“拒绝连接”,所以运行前先启动hdfs,命令:cd /usr/local/hadoop
./sbin/start-dfs.sh(前面有一点)
// 在/usr/local/spark/mycode/kafka目录下新建simple.sbt打包,代码如下
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/*:/usr/local/spark/jars/kafka/* --class "org.apache.spark.examples.streaming.KafkaWordCount" /usr/local/spark/mycode/kafka/target/scala-2.11/simple-project_2.11-1.0.jar 127.0.0.1:2181 1 sex 1
// 编译打包程序:/usr/local/sbt/sbt packag
接着在在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,代码如下
import json from flask import Flask, render_template from flask_socketio import SocketIO from kafka import KafkaConsumer #因为第一步骤安装好了flask,所以这里可以引用 app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app) thread = None # 实例化一个consumer,接收topic为result的消息 consumer = KafkaConsumer('result') # 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器 def background_thread(): girl = 0 boy = 0 for msg in consumer: data_json = msg.value.decode('utf8') data_list = json.loads(data_json) for data in data_list: if '0' in data.keys(): girl = data['0'] elif '1' in data.keys(): boy = data['1'] else: continue result = str(girl) + ',' + str(boy) print(result) socketio.emit('test_message',{'data':result}) # 客户端发送connect事件时的处理函数 @socketio.on('test_connect') def connect(message): print(message) global thread if thread is None: # 单独开启一个线程给客户端发送数据 thread = socketio.start_background_task(target=background_thread) socketio.emit('connected', {'data': 'Connected'}) # 通过访问http://127.0.0.1:5000/访问index.html @app.route("/") def handle_mes(): return render_template("index.html") # main函数 if __name__ == '__main__': socketio.run(app,debug=True)6.4、测试
运行:sh startup.sh
Consumer中接收的topic改为result
运行producer.py和consumer.py
利用Flask-SocketIO实时推送数据
socket.io.js实时获取数据
highlights.js展示数据
创建app.py文件
// 创建index页面
Girl: Boy:欢迎分享,转载请注明来源:内存溢出
评论列表(0条)