大数据工程实践参考手册

大数据工程实践参考手册,第1张

数据工程实践参考手册 大数据工程实践参考手册

文章目录

大数据工程实践参考手册

vmtools安装仍无法进行复制黏贴,拖拽文件ssh免密登录Unable to init server: 无法连接hadoop安装Hbase安装python脚本设置Ubuntu定时器利用Flume与Kafka进行日志数据采集

consumer接收数据失败 创建Hbase表时出现错误构建后端项目

安装Intellij代码

依赖HbaseUtilsDateUtils*CourseClickCountDao**CourseSearchClickCountDao*CountByStreaming 运行环境配置 构建前端项目

配置连接mysql无法连接代码

testSQLJdbcUtilsHbaseUtils Mysql数据

vmtools安装仍无法进行复制黏贴,拖拽文件

参考(93条消息) 解决虚拟机VMware运行Ubuntu时无法和主机之间复制粘贴的问题_LELELED的博客-CSDN博客_ubuntu虚拟机无法复制粘贴

依次执行以下命令后,重启虚拟机

sudo apt-get autoremove open-vm-tools
sudo apt-get install open-vm-tools
sudo apt-get install open-vm-tools-desktop
ssh免密登录

参考Ubuntu如何配置SSH免密登录 - 未分配微服务 - 博客园 (cnblogs.com)

首先检测是否安装了ssh

sudo ps -e |grep ssh

有这两个即代表安装了ssh,否则执行以下命令安装ssh

sudo apt-get install openssh-server

建议首先删除ssh目录,重新配置

rm -r  ~/.ssh

执行如下命令生成公钥和私钥,后一路回车

ssh-keygen -t rsa -P ""
#参数说明:-t为选择加密算法,-P为设置密码,设置为""就表明不需要密码

添加公钥到authorize_keys文件

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

最后ssh连接本机进行测试,第一连接需要输入yes

ssh localhost 或者 ssh 127.0.0.1
Unable to init server: 无法连接

参考(97条消息) 【错误处理】Unable to init server: 无法连接: 拒绝连接_Gloriiiaaa的博客-CSDN博客

使用如下指令

$ xhost local:gedit

若出现如下报错

xhost: unable to open display “”

可使用指令

$ export DISPLAY=:0

之后再次输入

$ xhost local:gedit

若出现

non-network local connections being added to access control list

即说明修改成功

hadoop安装

参考Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)_厦大数据库实验室博客 (xmu.edu.cn) 可以不创建hadoop用户

除了上述博客中的伪分布式配置,请再配置hadoop-env.sh文件

 vim /usr/local/hadoop/etc/hadoop/hadoop-env.sh

添加

export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162

防止出错

Hbase安装

参考Hbase2.2.2安装和编程实践指南_厦大数据库实验室博客 (xmu.edu.cn)

python脚本

修改pabu为你的用户名,并提前创建好data文件夹,脚本保存在data文件夹中,例如/home/s0109/data/click.log

import random
import time

url_paths = ["class/112.html",
             "class/128.html",
             "class/145.html",
             "class/146.html",
             "class/500.html",
             "class/250.html",
             "class/131.html",
             "class/130.html",
             "class/271.html",
             "class/127.html",
             "learn/821",
             "learn/823",
             "learn/987",
             "learn/500",
             "course/list"]

ip_slices = [132, 156, 124, 10, 29, 167, 143, 187, 30, 46,
             55, 63, 72, 87, 98, 168, 192, 134, 111, 54, 64, 110, 43]

http_referers = ["http://www.baidu.com/s?wd={query}", "https://www.sogou.com/web?query={query}",
                 "http://cn.bing.com/search?q={query}", "https://search.yahoo.com/search?p={query}", ]

search_keyword = ["Spark SQL实战", "Hadoop基础", "Storm实战",
                  "Spark Streaming实战", "10小时入门大数据", "SpringBoot实战", "Linux进阶", "Vue.js"]

status_codes = ["200", "404", "500", "403"]


def sample_url():
    return random.sample(url_paths, 1)[0]


def sample_ip():
    slice = random.sample(ip_slices, 4)
    return ".".join([str(item) for item in slice])


def sample_referer():
    if random.uniform(0, 1) > 0.5:
        return "-"
    refer_str = random.sample(http_referers, 1)
    query_str = random.sample(search_keyword, 1)
    return refer_str[0].format(query=query_str[0])


def sample_status_code():
    return random.sample(status_codes, 1)[0]


def generate_log(count=10):
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    f = open("/home/s0109/data/click.log", "w+")
    while count >= 1:
        query_log = "{ip}t{local_time}t"GET /{url} HTTP/1.1"t{status_code}t{referer}".format(url=sample_url(
        ), ip=sample_ip(), referer=sample_referer(), status_code=sample_status_code(), local_time=time_str)
        f.write(query_log + "n")
        count = count - 1


if __name__ == '__main__':
    generate_log(100)

设置Ubuntu定时器

crontab -e创建新的任务后建议选择2,前提是你安装了vim,选用1使用nano编辑器:ctrl+o保存,ctrl+x退出,如果选择完后想修改使用select-editor命令再次选择

定时器中的路径也需改成相应路径

利用Flume与Kafka进行日志数据采集

配置文件注意修改日志文件位置

exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel

exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /home/s0109/data/click.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c

exec-memory-kafka.channels.memory-channel.type = memory

exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092
exec-memory-kafka.sinks.kafka-sink.topic = streamtopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 10
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1

exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

启动zookeeper和Kafka时可以在命令后面加上&,使得其能在后台运行

consumer接收数据失败

查看Flume配置文件是否有误

创建Hbase表时出现错误

重启hbase

stop-hbase.sh
start-hbase.sh

重启后如hbase shell命令后先执行list命令,不会卡住时在创建,否则继续尝试重启

构建后端项目

注意从此步开始往后的所有对于Hbase进行 *** 作的代码执行前,请保存虚拟机快照,执行错误代码后会导致Hbase环境崩溃!!!

安装Intellij

参考(98条消息) Ubuntu20.04安装idea2020.2 IDE详细教程_liutao43的博客-CSDN博客_ubuntu20安装idea修改相应指令为你所下载的版本

简单来说就是2步,解压,运行

sudo tar -zxvf ideaIU-2020.2.3.tar.gz -C /opt  #/opt可以改为你想解压到的位置,压缩包可以换成你所下载的版本,解压前需要进入压缩包下载的位置
/opt/ideaIU-2020.2.3/bin/idea.sh 

安装完成后选Plugin,搜索scala安装插件,下载速度较慢,耐心等待

老版本的参考这个

随后创建工程,选中Scala,点击Create

选择2.11.12,下载,耐心等待,下载极其慢,但也就40多mb的文件,嫌慢的自己找离线的安装方式

设置maven环境时repository不存在自己创建

代码 依赖
 
        
            alimaven
            Maven Aliyun Mirror
            http://maven.aliyun.com/nexus/content/repositories/central/
             true 
             false 
        
    
    
        
            org.scala-lang
            scala-library
            2.11.8 
         org.apache.hadoop
            hadoop-client
            2.5.1
        
         org.apache.hbase
            hbase-client
            2.2.2
        
         org.apache.spark
            spark-streaming-kafka-0-8_2.11
            2.1.1
        
        
        com.fasterxml.jackson.module
        jackson-module-scala_2.11
        2.6.5
        
        
        net.jpountz.lz4
            lz4
        1.3.0
    
        
            org.apache.spark
            spark-streaming_2.11
            2.1.1
        
        
            org.apache.commons
            commons-lang3
            3.6
        
    

老版本填入后,点击Enable Auto—import进行更新

HbaseUtils

注意zk服务参数进行修改,替换下方s0109中的内容为你的账号

package com.spark.streaming.project.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;

public class HbaseUtils {
    private Configuration configuration = null;
    private Connection connection = null;
    private static HbaseUtils instance = null;

    private HbaseUtils(){
        try {
            configuration = new Configuration();
            //指定要访问的zk服务器
            configuration.set("hbase.zookeeper.quorum", "s0109:2181");
            // 得到Hbase连接
            connection = ConnectionFactory.createConnection(configuration);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    

    public static synchronized HbaseUtils getInstance(){
        if(instance == null){
            instance = new HbaseUtils();
        }
        return instance;
    }

    
    public HTable getTable(String tableName) {
        HTable hTable = null; 
        try {
            hTable = (HTable)connection.getTable(TableName.valueOf(tableName));
        }catch (Exception e){
            e.printStackTrace();
        }
        return hTable;
    }
}
DateUtils
package com.spark.streaming.project.utils

import org.apache.commons.lang3.time.FastDateFormat


object DateUtils {
  //指定输入的日期格式
    val YYYYMMDDHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss");
  //指定输出格式
  val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddhhmmss")

  // 输入String返回该格式转为log的结果
  def getTime(time: String) = {
    YYYYMMDDHMMSS_FORMAT.parse(time).getTime
  }

  def parseToMinute(time: String) = {
    //调用getTime
    TARGET_FORMAT.format(getTime(time))
  }
}
CourseClickCountDao
package com.spark.streaming.project.dao

import com.spark.streaming.project.domain.CourseClickCount
import com.spark.streaming.project.utils.HbaseUtils

import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer

object CourseClickCountDao {
  val tableName = "ns1:courses_clickcount" //表名
  val cf = "info" //列族
  val qualifer = "click_count" //列

  
  def save(list: ListBuffer[CourseClickCount]): Unit = {
    //调用HbaseUtils的方法,获得Hbase表实例
    val table = HbaseUtils.getInstance().getTable(tableName)
    for (item <- list) {
      //调用Hbase的一个自增加方法
      table.incrementColumnValue(Bytes.toBytes(item.day_course),
        Bytes.toBytes(cf), Bytes.toBytes(qualifer),
        item.click_count) //赋值为Long,自动转换
    }
  }
}
CourseSearchClickCountDao
package com.spark.streaming.project.dao

import com.spark.streaming.project.domain.CourseSearchClickCount
import com.spark.streaming.project.utils.HbaseUtils
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer

object CourseSearchClickCountDao {
  val tableName = "ns1:courses_search_clickcount"
  val cf = "info"
  val qualifer = "click_count"

  
  def save(list: ListBuffer[CourseSearchClickCount]): Unit = {
    val table = HbaseUtils.getInstance().getTable(tableName)
    for (item <- list) {
      table.incrementColumnValue(Bytes.toBytes(item.day_serach_course),
        Bytes.toBytes(cf), Bytes.toBytes(qualifer),
        item.click_count
      ) //赋值为Long,自动转换 
    }
  }
}
CountByStreaming
package com.spark.streaming.project.application

import com.spark.streaming.project.domain.ClickLog
import com.spark.streaming.project.domain.CourseClickCount
import com.spark.streaming.project.domain.CourseSearchClickCount
import com.spark.streaming.project.utils.DateUtils
import com.spark.streaming.project.dao.CourseClickCountDao
import com.spark.streaming.project.dao.CourseSearchClickCountDao
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

object CountByStreaming {
  def main(args: Array[String]): Unit = {
    
    if (args.length != 4) {
      System.err.println("Error:you need to input:   ")
      System.exit(1)
    }
    //接收main函数的参数,外面的传参 
    val Array(zkAdderss, group, toplics, threadNum) = args

    
    val sparkConf = new SparkConf()
      .setAppName("CountByStreaming")
      .setMaster("local[4]")

    //创建Spark离散流,每隔60秒接收数据 
    val ssc = new StreamingContext(sparkConf, Seconds(60))
    //使用kafka作为数据源 
    val topicsMap = toplics.split(",").map((_, threadNum.toInt)).toMap
    //创建kafka离散流,每隔60秒消费一次kafka集群的数据 
    val kafkaInputDS = KafkaUtils.createStream(ssc, zkAdderss, group, topicsMap)
    //得到原始的日志数据 
    val logResourcesDS = kafkaInputDS.map(_._2)
    
    val cleanDataRDD = logResourcesDS.map(line => {
      val splits = line.split("t")
      if (splits.length != 5) {
        //不合法的数据直接封装默认赋予错误值,filter会将其过 滤 
        ClickLog("", "", 0, 0, "")
      }
      else {
        val ip = splits(0) //获得日志中用户的ip 
        val time = DateUtils.parseToMinute(splits(1)) //获得日志中用户的访问时间,并调 用DateUtils格式化时间 
        val status = splits(3).toInt //获得访问状态码 
        val referer = splits(4)
        val url = splits(2).split(" ")(1) //获得搜索url
        var courseId = 0
        if (url.startsWith("/class")) {
          val courseIdHtml = url.split("/")(2)
          courseId = courseIdHtml.substring(0, courseIdHtml.lastIndexOf(".")).toInt
        }
        ClickLog(ip, time, courseId, status, referer) //将清洗后的日志封装到ClickLog中
      }
    }).filter(x => x.courseId != 0) //过滤掉非实战课程
    
    cleanDataRDD.map(line => {
      //这里相当于定义Hbase表"ns1:courses_clickcount"的RowKey, 
      // 将‘日期_课程’作为RowKey,意义为某天某门课的访问数 
      (line.time.substring(0, 8) + "_" + line.courseId, 1) //映射为元组 
    }).reduceByKey(_ + _) //聚合
      .foreachRDD(rdd => { //一个DStream里有多个RDD 
        rdd.foreachPartition(partition => { //一个RDD里有多个Partition
          val list = new ListBuffer[CourseClickCount]
          partition.foreach(item => { //一个Partition里有多条记录 
            list.append(CourseClickCount(item._1, item._2))
          })
          CourseClickCountDao.save(list) //保存至Hbase 
        })
      })

    
    cleanDataRDD.map(line => {
      val referer = line.referer
      val time = line.time.substring(0, 8)
      var url = ""
      if (referer == "-") { //过滤非法url 
        (url, time)
      }
      else {
        //取出搜索引擎的名字 
        url = referer.replaceAll("//", "/").split("/")(1)
        (url, time)
      }
    }).filter(x => x._1 != "").map(line => {
      //这里相当于定义Hbase表"ns1:courses_search_clickcount"的RowKey, 
      // 将'日期_搜索引擎名'作为RowKey,意义为某天通过某搜搜引擎访问课程的次数 
      (line._2 + "_" + line._1, 1) //映射为元组 
    }).reduceByKey(_ + _) //聚合
      .foreachRDD(rdd => {
        rdd.foreachPartition(partition => {
          val list = new ListBuffer[CourseSearchClickCount]
          partition.foreach(item => {
            list.append(CourseSearchClickCount(item._1, item._2))
          })
          CourseSearchClickCountDao.save(list)
        })
      })
    ssc.start()
    ssc.awaitTermination()
  }
}
运行环境配置

先点击run,d出一个框后选择CountByStreaming,选择不带$的然后参考pdf中的环境配置,在Program arguments 填入参数

构建前端项目 配置
    
        8
        8
    
     
         
            org.apache.hbase 
            hbase-client 
            2.2.2 
         
         
            junit 
            junit 
            4.11 
          
        javax.servlet 
        javax.servlet-api 
        3.1.0 
     
         
            net.sf.json-lib 
            json-lib 
            jdk15
            2.4  
         com.alibaba 
            fastjson 
            1.2.78 
          
        junit 
        junit 
        4.11  
    
连接mysql无法连接

首先要保证你的mysql中有spark这个数据库

ERROR:1

类似的报错,不同版本可能有所不同

原因mysql没有在3306端口启动

sudo vim /etc/mysql/mysql.conf.d/mysqld.cnf
将skip-grant-tables注释

ERROR:2

jdbc版本不兼容

如果你mysql是执行pdf上的命令没有指定版本的话,去官网下载最新版本的jdbc,参考(MySQL_JDBC_jar包的下载与使用(Windows) - 苍凉温暖 - 博客园 (cnblogs.com))

代码 testSQL

**注意:**修改testHbase()代码中的时间为你执行后端代码的时间,例如20211001改为20211229,执行前请拍摄虚拟机快照,并检查HbaseUtils文件中的zookeeper服务器地址是否正确

import com.test.utils.HbaseUtils;
import com.test.utils.JdbcUtils;
import org.junit.Test;
import java.sql.*;
import java.util.Map;

public class testSQL {
    @Test
    public void testjdbc() throws ClassNotFoundException {
        Class.forName("com.mysql.cj.jdbc.Driver");
        String url = "jdbc:mysql://localhost:3306/spark";
        String username = "root";
        String password = "root";
        try {
            Connection conn = DriverManager.getConnection(url, username,
                    password);
            Statement stmt = conn.createStatement();
            ResultSet res = stmt.executeQuery("select * from course");
            while (res.next())
                System.out.println(res.getString(1)+" "+res.getString(2));
            conn.close();
            stmt.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    @Test
    public void testJdbcUtils() throws ClassNotFoundException {
        System.out.println(JdbcUtils.getInstance().getCourseName("128"));
        System.out.println(JdbcUtils.getInstance().getCourseName("112"));
    }
    @Test
    public void testHbase() {    	
        MapclickCount=HbaseUtils.getInstance().getClickCount("ns1:courses_clickcount", "20211001");
        for (String x : clickCount.keySet())
            System.out.println(x + " " +clickCount.get(x));
    }
}
JdbcUtils
package com.test.utils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
public class JdbcUtils {
    private Connection connection = null;
    private static JdbcUtils jdbcUtils = null;
    Statement stmt = null;
    private JdbcUtils() throws ClassNotFoundException {
        Class.forName("com.mysql.jdbc.Driver");
        String url = "jdbc:mysql://localhost:3306/spark?useSSL=false";
        String username = "root";
        String password = "root";
        try {
            connection = DriverManager.getConnection(url, username, password);
            stmt = connection.createStatement();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    
    public static synchronized JdbcUtils getInstance() throws
            ClassNotFoundException {
        if(jdbcUtils == null){
            jdbcUtils = new JdbcUtils();
        }
        return jdbcUtils;
    }
    
    public String getCourseName(String id){
        try {
            ResultSet res = stmt.executeQuery("select * from course where id ='" + id + "'");
            while (res.next())
                return res.getString(2);
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
    
    public Map getClickCount(String tableName, String date){
        Map map = new HashMap();
        try {
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
        return map;
    }
}

HbaseUtils

注意修改zookeeper服务器地址中的locahost为你的用户名

package com.test.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.HashMap;
import java.util.Map;
public class HbaseUtils {
    private Configuration configuration = null;
    private Connection connection = null;
    private static HbaseUtils hbaseUtil = null;
    private HbaseUtils(){
        try {
            configuration = new Configuration();
//zookeeper服务器的地址
            configuration.set("hbase.zookeeper.quorum","localhost:2181");
            connection = ConnectionFactory.createConnection(configuration);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    
    public static synchronized HbaseUtils getInstance(){
        if(hbaseUtil == null){
            hbaseUtil = new HbaseUtils();
        }
        return hbaseUtil;
    }
    
    public HTable getTable(String tableName){
        try {
            HTable table = null;
            table = (HTable)connection.getTable(TableName.valueOf(tableName));
            return table;
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
    
    public Map getClickCount(String tableName, String date){
        Map map = new HashMap();
        try {
        //得到表实例
            HTable table = getInstance().getTable(tableName);
        //列族
            String cf = "info";
        //列
            String qualifier = "click_count";
//定义扫描器前缀过滤器,只扫描给定日期的row
            Filter filter = new PrefixFilter(Bytes.toBytes(date));
//定义扫描器
            Scan scan = new Scan();
            scan.setFilter(filter);
            ResultScanner results = table.getScanner(scan);
            for(Result result:results){
//取出rowKey
                String rowKey = Bytes.toString(result.getRow());
//取出点击次数
                Long clickCount =
                        Bytes.toLong(result.getValue(cf.getBytes(),qualifier.getBytes()));
                map.put(rowKey,clickCount);
            }
        }catch (Exception e){
            e.printStackTrace();
            return null;
        }
        return map;
    }
}
Mysql数据

使用shell命令登录mysql后执行

use spark;
DROP TABLE IF EXISTS `course`;
CREATE TABLE `course`  (
  `id` int NOT NULL,
  `course` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
INSERT INTO `course` VALUES (112, 'Spark');
INSERT INTO `course` VALUES (127, 'Hbase');
INSERT INTO `course` VALUES (128, 'Flink');
INSERT INTO `course` VALUES (130, 'Hadoop');
INSERT INTO `course` VALUES (145, 'Linux');
INSERT INTO `course` VALUES (146, 'Python');

后面的教程没了,自己想办法做吧

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5701142.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存