spark大数据工程实战:实时数据流处理

spark大数据工程实战:实时数据流处理,第1张

spark大数据工程实战:实时数据流处理

大数据工程实战:实时数据流处理

一、配置环境

1、hadoop伪分布配置2、hbase伪分布配置。创建Flume日志文件。。hbase建表。。mysql建表

低版本mysql可能出现的问题 二、后端项目三、前端项目四、最后结果

一、配置环境

小tips:
删除hadoop

cd /usr/local
sudo rm -rf hadoop

删除hbase

cd /usr/local
sudo rm -rf hbase
1、hadoop伪分布配置
$ cd ~/Downloads     # 进入下载目录
$ wget -c http://res.aihyzh.com/大数据技术原理与应用3/02/hadoop-3.1.3.tar.gz #下载
资源
$ sudo tar -zxf ~/Downloads/hadoop-3.1.3.tar.gz -C /usr/local   # 解压
到/usr/local中
$ cd /usr/local/
$ sudo mv ./hadoop-3.1.3/ ./hadoop     # 将文件夹名改为hadoop
$ sudo chown -R stu:stu ./hadoop       # 修改文件权限

需要配置相关文件,才能够让Hadoop在伪分布式模式下顺利运行。Hadoop的配置文件位于 /usr/local/hadoop/etc/hadoop/ 中,进行伪分布式模式配置时,需要修改3个配置文件,即hadoop-env.sh , core-site.xml 和 hdfs-site.xml
1.使用vim编辑器打开 hadoop-env.sh 文件,进行修改 JAVA_HOME 配置。

export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162

2.使用vim编辑器打开 core-site.xml 文件


   
       hadoop.tmp.dir
       file:/usr/local/hadoop/tmp
       Abase for other temporary directories.
   
   
       fs.defaultFS
       hdfs://localhost:9000
   

3.使用vim编辑器打开 hdfs-site.xml 文件


   
       dfs.replication
       1
   
   
       dfs.namenode.name.dir
       file:/usr/local/hadoop/tmp/dfs/name
   
   
       dfs.datanode.data.dir
       file:/usr/local/hadoop/tmp/dfs/data
   

4.执行名称节点格式化

cd /usr/local/hadoop
./bin/hdfs namenode -format

5.启动hadoop

cd /usr/local/hadoop
./sbin/start-dfs.sh

出现NameNode、DataNode、SecondaryNameNode就算成功。

2、hbase伪分布配置
cd ~/Downloads     # 进入下载目录
wget -c http://res.aihyzh.com/大数据技术原理与应用3/04/hbase-2.2.2-bin.tar.gz #下载资源
sudo tar -zxf ~/Downloads/hbase-2.2.2-bin.tar.gz -C /usr/local
sudo mv /usr/local/hbase-2.2.2 /usr/local/hbase

1.配置环境变量:

vim ~/.bashrc

和之前hadoop写一起

export PATH=$PATH:/usr/local/hadoop/sbin:/usr/local/hadoop/bin:/usr/local/hbase/bin

添加后,执行如下命令使设置生效:

 source ~/.bashrc

2.添加用户权限

sudo chown -R stu:stu /usr/local/hbase

不要直接查看hbase版本,会报错,配好伪分布再看。

3.配置hbase-env.sh文件

vim /usr/local/hbase/conf/hbase-env.sh
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export Hbase_CLASSPATH=/usr/local/hadoop/conf
export Hbase_MANAGES_ZK=true

配置hbase-site.xml文件

vim /usr/local/hbase/conf/hbase-site.xml

       
               hbase.rootdir
               hdfs://localhost:9000/hbase
       
       
               hbase.cluster.distributed
               true
       
       
         hbase.unsafe.stream.capability.enforce
         false
       

4.查看Hbase版本信息,以确认Hbase已经安装成功:

/usr/local/hbase/bin/hbase version

先启动hadoop,再启动hbase。
以进入 Hbase Shell 模式,命令如下

 bin/hbase shell
。创建Flume日志文件

在/usr/local/flume/conf下新建一个配置文件 streaming_project.conf,配置source、sink、Channel的
各项参数,连接各个组件,其配置内容如下:
第五行是一行!!!

exec-memory-kafka.sources = exec-source 
exec-memory-kafka.sinks = kafka-sink 
exec-memory-kafka.channels = memory-channel 
exec-memory-kafka.sources.exec-source.type = exec 
exec-memory-kafka.sources.exec-source.command = tail -F /home/pabu/data/click.log 
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c 

exec-memory-kafka.channels.memory-channel.type = memory
 
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink 
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092 
exec-memory-kafka.sinks.kafka-sink.topic = streamtopic 
exec-memory-kafka.sinks.kafka-sink.batchSize = 10 
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1 

exec-memory-kafka.sources.exec-source.channels = memory-channel 
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
。。hbase建表

、、、、、、

。。mysql建表



低版本mysql可能出现的问题

mysql插入数据无法插入中文
使用: show variables like “%character%”;
查看编码,发现数据库默认编码时Latin,该编码无法使用中文。

解决办法,创建表时直接设置默认编码,使用utf-8后能正常插入中文。

二、后端项目

目录结构:


        
            org.scala-lang
            scala-library
            2.11.8
        
        
            org.apache.hadoop
            hadoop-client
            2.5.1
        
        
            org.apache.hbase
            hbase-client
            2.2.2
        
        
            org.apache.spark
            spark-streaming-kafka-0-8_2.11
            2.1.1
        
        
            com.fasterxml.jackson.module
            jackson-module-scala_2.11
            2.6.5
        
        
            net.jpountz.lz4
            lz4
            1.3.0
        
        
        	org.apache.spark
        	spark-streaming_2.11
        	2.1.1
        
        
            org.apache.commons
            commons-lang3
            3.6
        

配置maven前最好修改成国内镜像源

settings.xml

在图示位置添加阿里云

	
		alimaven
		central 
		aliyun maven
		http://maven.aliyun.com/nexus/content/groups/public/ 
	

在util包下创建Java类HbaseUtils,用于连接Hbase,存储处理的结果,类HbaseUtils完整代码如下:
HbaseUtils.java
注意!!!下面这行要改成localhost
configuration.set("hbase.zookeeper.quorum", "localhost:2181");

package com.spark.streaming.project.utils;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;


public class HbaseUtils {
    private Configuration configuration = null;
    private Connection connection = null;
    private static HbaseUtils instance = null;
    
    private HbaseUtils(){
        try {
            configuration = new Configuration();//指定要访问的zk服务器
            configuration.set("hbase.zookeeper.quorum", "localhost:2181");
//得到Hbase连接
            connection = ConnectionFactory.createConnection(configuration);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    public static synchronized HbaseUtils getInstance(){
        if(instance == null){
            instance = new HbaseUtils();
        }
        return instance;
    }
    
    public HTable getTable(String tableName) {
        HTable hTable = null;
        try {
            hTable = (HTable)connection.getTable(TableName.valueOf(tableName));
        }catch (Exception e){
            e.printStackTrace();
        }
        return hTable;
    }
}

测试是否写入Hbase

    启动hadoopcd /usr/local/hadoop ./sbin/start-dfs.sh启动hbasecd /usr/local/hbase ./bin/start-hbase.sh启动kafkacd /usr/local/kafka ./bin/kafka-server-start.sh config/server.properties利用crontab -e实现数据流的产生启动Flume,监控存储日志的文件cd /usr/local/flume/bin ./flume-ng agent --conf conf --name exec-memory-kafka --conf-file /usr/local/flume/conf/streaming_project.conf运行本项目的核心类 CountByStreaming ,因为我们设置main方法要接收许多参数,因此在运行前需要配置一下该类:

    新建application

    选择运行的主类

    配置main方法接收的参数

    开启所有的服务。

    开启ZooKeeper时提示地址已在使用,无法开启。

    使用语句:sudo lsof -i:2181
    查看2181端口,发现被ipv6占用,直接kill掉进程,能正常启动。

运行后端项目:

打开hbase shell
输入

scan 'ns1:courses_search_clickcount'
scan 'ns1:courses_clickcount'

可以看到数据成功写入hbase。


可能会出现的情况:注意!!一定要先等后端程序运行完,再开启hbase shell读数据,不然会报错 。。。is not online on xxx-virtual-machine

或者拒绝连接

推测是不能两边同时读写hbase,会报错。解决不了,建议重头再来吧。

三、前端项目

目录结构


配置maven


        
            org.apache.hbase
            hbase-client
            2.2.2
        
        
            junit
            junit
            4.11
        
        
            javax.servlet
            javax.servlet-api
            3.1.0
        
        
            net.sf.json-lib
            json-lib
            2.4
            jdk15
        
        
            com.alibaba
            fastjson
            1.2.78
        
        
            mysql
            mysql-connector-java
            8.0.25
        
    

推荐用maven安装MySQL的jdbc的驱动

代码略

idea配置mysql要花点时间。

测试mysql是否连接成功

测试hbase是否连接成功

有问题再写

四、最后结果

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5702164.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存