大数据工程实践参考手册
vmtools安装仍无法进行复制黏贴,拖拽文件ssh免密登录Unable to init server: 无法连接hadoop安装Hbase安装python脚本设置Ubuntu定时器利用Flume与Kafka进行日志数据采集
consumer接收数据失败 创建Hbase表时出现错误构建后端项目
安装Intellij代码
依赖HbaseUtilsDateUtils*CourseClickCountDao**CourseSearchClickCountDao*CountByStreaming 运行环境配置 构建前端项目
配置连接mysql无法连接代码
testSQLJdbcUtilsHbaseUtils Mysql数据
vmtools安装仍无法进行复制黏贴,拖拽文件参考(93条消息) 解决虚拟机VMware运行Ubuntu时无法和主机之间复制粘贴的问题_LELELED的博客-CSDN博客_ubuntu虚拟机无法复制粘贴
依次执行以下命令后,重启虚拟机
sudo apt-get autoremove open-vm-tools sudo apt-get install open-vm-tools sudo apt-get install open-vm-tools-desktopssh免密登录
参考Ubuntu如何配置SSH免密登录 - 未分配微服务 - 博客园 (cnblogs.com)
首先检测是否安装了ssh
sudo ps -e |grep ssh
有这两个即代表安装了ssh,否则执行以下命令安装ssh
sudo apt-get install openssh-server
建议首先删除ssh目录,重新配置
rm -r ~/.ssh
执行如下命令生成公钥和私钥,后一路回车
ssh-keygen -t rsa -P "" #参数说明:-t为选择加密算法,-P为设置密码,设置为""就表明不需要密码
添加公钥到authorize_keys文件
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
最后ssh连接本机进行测试,第一连接需要输入yes
ssh localhost 或者 ssh 127.0.0.1Unable to init server: 无法连接
参考(97条消息) 【错误处理】Unable to init server: 无法连接: 拒绝连接_Gloriiiaaa的博客-CSDN博客
使用如下指令
$ xhost local:gedit
若出现如下报错
xhost: unable to open display “”
可使用指令
$ export DISPLAY=:0
之后再次输入
$ xhost local:gedit
若出现
non-network local connections being added to access control list
即说明修改成功
hadoop安装参考Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)_厦大数据库实验室博客 (xmu.edu.cn) 可以不创建hadoop用户
除了上述博客中的伪分布式配置,请再配置hadoop-env.sh文件
vim /usr/local/hadoop/etc/hadoop/hadoop-env.sh
添加
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
防止出错
Hbase安装参考Hbase2.2.2安装和编程实践指南_厦大数据库实验室博客 (xmu.edu.cn)
python脚本修改pabu为你的用户名,并提前创建好data文件夹,脚本保存在data文件夹中,例如/home/s0109/data/click.log
import random import time url_paths = ["class/112.html", "class/128.html", "class/145.html", "class/146.html", "class/500.html", "class/250.html", "class/131.html", "class/130.html", "class/271.html", "class/127.html", "learn/821", "learn/823", "learn/987", "learn/500", "course/list"] ip_slices = [132, 156, 124, 10, 29, 167, 143, 187, 30, 46, 55, 63, 72, 87, 98, 168, 192, 134, 111, 54, 64, 110, 43] http_referers = ["http://www.baidu.com/s?wd={query}", "https://www.sogou.com/web?query={query}", "http://cn.bing.com/search?q={query}", "https://search.yahoo.com/search?p={query}", ] search_keyword = ["Spark SQL实战", "Hadoop基础", "Storm实战", "Spark Streaming实战", "10小时入门大数据", "SpringBoot实战", "Linux进阶", "Vue.js"] status_codes = ["200", "404", "500", "403"] def sample_url(): return random.sample(url_paths, 1)[0] def sample_ip(): slice = random.sample(ip_slices, 4) return ".".join([str(item) for item in slice]) def sample_referer(): if random.uniform(0, 1) > 0.5: return "-" refer_str = random.sample(http_referers, 1) query_str = random.sample(search_keyword, 1) return refer_str[0].format(query=query_str[0]) def sample_status_code(): return random.sample(status_codes, 1)[0] def generate_log(count=10): time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) f = open("/home/s0109/data/click.log", "w+") while count >= 1: query_log = "{ip}t{local_time}t"GET /{url} HTTP/1.1"t{status_code}t{referer}".format(url=sample_url( ), ip=sample_ip(), referer=sample_referer(), status_code=sample_status_code(), local_time=time_str) f.write(query_log + "n") count = count - 1 if __name__ == '__main__': generate_log(100)设置Ubuntu定时器
crontab -e创建新的任务后建议选择2,前提是你安装了vim,选用1使用nano编辑器:ctrl+o保存,ctrl+x退出,如果选择完后想修改使用select-editor命令再次选择
定时器中的路径也需改成相应路径
利用Flume与Kafka进行日志数据采集配置文件注意修改日志文件位置
exec-memory-kafka.sources = exec-source exec-memory-kafka.sinks = kafka-sink exec-memory-kafka.channels = memory-channel exec-memory-kafka.sources.exec-source.type = exec exec-memory-kafka.sources.exec-source.command = tail -F /home/s0109/data/click.log exec-memory-kafka.sources.exec-source.shell = /bin/sh -c exec-memory-kafka.channels.memory-channel.type = memory exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink exec-memory-kafka.sinks.kafka-sink.brokerList = localhost:9092 exec-memory-kafka.sinks.kafka-sink.topic = streamtopic exec-memory-kafka.sinks.kafka-sink.batchSize = 10 exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1 exec-memory-kafka.sources.exec-source.channels = memory-channel exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
启动zookeeper和Kafka时可以在命令后面加上&,使得其能在后台运行
consumer接收数据失败查看Flume配置文件是否有误
创建Hbase表时出现错误重启hbase
stop-hbase.sh start-hbase.sh
重启后如hbase shell命令后先执行list命令,不会卡住时在创建,否则继续尝试重启
构建后端项目注意从此步开始往后的所有对于Hbase进行 *** 作的代码执行前,请保存虚拟机快照,执行错误代码后会导致Hbase环境崩溃!!!
安装Intellij参考(98条消息) Ubuntu20.04安装idea2020.2 IDE详细教程_liutao43的博客-CSDN博客_ubuntu20安装idea修改相应指令为你所下载的版本
简单来说就是2步,解压,运行
sudo tar -zxvf ideaIU-2020.2.3.tar.gz -C /opt #/opt可以改为你想解压到的位置,压缩包可以换成你所下载的版本,解压前需要进入压缩包下载的位置 /opt/ideaIU-2020.2.3/bin/idea.sh
安装完成后选Plugin,搜索scala安装插件,下载速度较慢,耐心等待
老版本的参考这个
随后创建工程,选中Scala,点击Create
选择2.11.12,下载,耐心等待,下载极其慢,但也就40多mb的文件,嫌慢的自己找离线的安装方式
设置maven环境时repository不存在自己创建
代码 依赖alimaven Maven Aliyun Mirror http://maven.aliyun.com/nexus/content/repositories/central/ true false org.scala-lang scala-library2.11.8 org.apache.hadoop hadoop-client2.5.1 org.apache.hbase hbase-client2.2.2 org.apache.spark spark-streaming-kafka-0-8_2.112.1.1 com.fasterxml.jackson.module jackson-module-scala_2.112.6.5 net.jpountz.lz4 lz41.3.0 org.apache.spark spark-streaming_2.112.1.1 org.apache.commons commons-lang33.6
老版本填入后,点击Enable Auto—import进行更新
HbaseUtils注意zk服务参数进行修改,替换下方s0109中的内容为你的账号
package com.spark.streaming.project.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; public class HbaseUtils { private Configuration configuration = null; private Connection connection = null; private static HbaseUtils instance = null; private HbaseUtils(){ try { configuration = new Configuration(); //指定要访问的zk服务器 configuration.set("hbase.zookeeper.quorum", "s0109:2181"); // 得到Hbase连接 connection = ConnectionFactory.createConnection(configuration); }catch(Exception e){ e.printStackTrace(); } } public static synchronized HbaseUtils getInstance(){ if(instance == null){ instance = new HbaseUtils(); } return instance; } public HTable getTable(String tableName) { HTable hTable = null; try { hTable = (HTable)connection.getTable(TableName.valueOf(tableName)); }catch (Exception e){ e.printStackTrace(); } return hTable; } }DateUtils
package com.spark.streaming.project.utils import org.apache.commons.lang3.time.FastDateFormat object DateUtils { //指定输入的日期格式 val YYYYMMDDHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd hh:mm:ss"); //指定输出格式 val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddhhmmss") // 输入String返回该格式转为log的结果 def getTime(time: String) = { YYYYMMDDHMMSS_FORMAT.parse(time).getTime } def parseToMinute(time: String) = { //调用getTime TARGET_FORMAT.format(getTime(time)) } }CourseClickCountDao
package com.spark.streaming.project.dao import com.spark.streaming.project.domain.CourseClickCount import com.spark.streaming.project.utils.HbaseUtils import org.apache.hadoop.hbase.util.Bytes import scala.collection.mutable.ListBuffer object CourseClickCountDao { val tableName = "ns1:courses_clickcount" //表名 val cf = "info" //列族 val qualifer = "click_count" //列 def save(list: ListBuffer[CourseClickCount]): Unit = { //调用HbaseUtils的方法,获得Hbase表实例 val table = HbaseUtils.getInstance().getTable(tableName) for (item <- list) { //调用Hbase的一个自增加方法 table.incrementColumnValue(Bytes.toBytes(item.day_course), Bytes.toBytes(cf), Bytes.toBytes(qualifer), item.click_count) //赋值为Long,自动转换 } } }CourseSearchClickCountDao
package com.spark.streaming.project.dao import com.spark.streaming.project.domain.CourseSearchClickCount import com.spark.streaming.project.utils.HbaseUtils import org.apache.hadoop.hbase.util.Bytes import scala.collection.mutable.ListBuffer object CourseSearchClickCountDao { val tableName = "ns1:courses_search_clickcount" val cf = "info" val qualifer = "click_count" def save(list: ListBuffer[CourseSearchClickCount]): Unit = { val table = HbaseUtils.getInstance().getTable(tableName) for (item <- list) { table.incrementColumnValue(Bytes.toBytes(item.day_serach_course), Bytes.toBytes(cf), Bytes.toBytes(qualifer), item.click_count ) //赋值为Long,自动转换 } } }CountByStreaming
package com.spark.streaming.project.application import com.spark.streaming.project.domain.ClickLog import com.spark.streaming.project.domain.CourseClickCount import com.spark.streaming.project.domain.CourseSearchClickCount import com.spark.streaming.project.utils.DateUtils import com.spark.streaming.project.dao.CourseClickCountDao import com.spark.streaming.project.dao.CourseSearchClickCountDao import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ListBuffer object CountByStreaming { def main(args: Array[String]): Unit = { if (args.length != 4) { System.err.println("Error:you need to input:运行环境配置") System.exit(1) } //接收main函数的参数,外面的传参 val Array(zkAdderss, group, toplics, threadNum) = args val sparkConf = new SparkConf() .setAppName("CountByStreaming") .setMaster("local[4]") //创建Spark离散流,每隔60秒接收数据 val ssc = new StreamingContext(sparkConf, Seconds(60)) //使用kafka作为数据源 val topicsMap = toplics.split(",").map((_, threadNum.toInt)).toMap //创建kafka离散流,每隔60秒消费一次kafka集群的数据 val kafkaInputDS = KafkaUtils.createStream(ssc, zkAdderss, group, topicsMap) //得到原始的日志数据 val logResourcesDS = kafkaInputDS.map(_._2) val cleanDataRDD = logResourcesDS.map(line => { val splits = line.split("t") if (splits.length != 5) { //不合法的数据直接封装默认赋予错误值,filter会将其过 滤 ClickLog("", "", 0, 0, "") } else { val ip = splits(0) //获得日志中用户的ip val time = DateUtils.parseToMinute(splits(1)) //获得日志中用户的访问时间,并调 用DateUtils格式化时间 val status = splits(3).toInt //获得访问状态码 val referer = splits(4) val url = splits(2).split(" ")(1) //获得搜索url var courseId = 0 if (url.startsWith("/class")) { val courseIdHtml = url.split("/")(2) courseId = courseIdHtml.substring(0, courseIdHtml.lastIndexOf(".")).toInt } ClickLog(ip, time, courseId, status, referer) //将清洗后的日志封装到ClickLog中 } }).filter(x => x.courseId != 0) //过滤掉非实战课程 cleanDataRDD.map(line => { //这里相当于定义Hbase表"ns1:courses_clickcount"的RowKey, // 将‘日期_课程’作为RowKey,意义为某天某门课的访问数 (line.time.substring(0, 8) + "_" + line.courseId, 1) //映射为元组 }).reduceByKey(_ + _) //聚合 .foreachRDD(rdd => { //一个DStream里有多个RDD rdd.foreachPartition(partition => { //一个RDD里有多个Partition val list = new ListBuffer[CourseClickCount] partition.foreach(item => { //一个Partition里有多条记录 list.append(CourseClickCount(item._1, item._2)) }) CourseClickCountDao.save(list) //保存至Hbase }) }) cleanDataRDD.map(line => { val referer = line.referer val time = line.time.substring(0, 8) var url = "" if (referer == "-") { //过滤非法url (url, time) } else { //取出搜索引擎的名字 url = referer.replaceAll("//", "/").split("/")(1) (url, time) } }).filter(x => x._1 != "").map(line => { //这里相当于定义Hbase表"ns1:courses_search_clickcount"的RowKey, // 将'日期_搜索引擎名'作为RowKey,意义为某天通过某搜搜引擎访问课程的次数 (line._2 + "_" + line._1, 1) //映射为元组 }).reduceByKey(_ + _) //聚合 .foreachRDD(rdd => { rdd.foreachPartition(partition => { val list = new ListBuffer[CourseSearchClickCount] partition.foreach(item => { list.append(CourseSearchClickCount(item._1, item._2)) }) CourseSearchClickCountDao.save(list) }) }) ssc.start() ssc.awaitTermination() } }
先点击run,d出一个框后选择CountByStreaming,选择不带$的然后参考pdf中的环境配置,在Program arguments 填入参数
构建前端项目 配置连接mysql无法连接8 8 org.apache.hbase hbase-client2.2.2 junit junit4.11 javax.servlet javax.servlet-api3.1.0 net.sf.json-lib json-libjdk15 2.4 com.alibaba fastjson1.2.78 junit junit4.11
首先要保证你的mysql中有spark这个数据库
ERROR:1
类似的报错,不同版本可能有所不同
原因mysql没有在3306端口启动
sudo vim /etc/mysql/mysql.conf.d/mysqld.cnf 将skip-grant-tables注释
ERROR:2
jdbc版本不兼容
如果你mysql是执行pdf上的命令没有指定版本的话,去官网下载最新版本的jdbc,参考(MySQL_JDBC_jar包的下载与使用(Windows) - 苍凉温暖 - 博客园 (cnblogs.com))
代码 testSQL**注意:**修改testHbase()代码中的时间为你执行后端代码的时间,例如20211001改为20211229,执行前请拍摄虚拟机快照,并检查HbaseUtils文件中的zookeeper服务器地址是否正确
import com.test.utils.HbaseUtils; import com.test.utils.JdbcUtils; import org.junit.Test; import java.sql.*; import java.util.Map; public class testSQL { @Test public void testjdbc() throws ClassNotFoundException { Class.forName("com.mysql.cj.jdbc.Driver"); String url = "jdbc:mysql://localhost:3306/spark"; String username = "root"; String password = "root"; try { Connection conn = DriverManager.getConnection(url, username, password); Statement stmt = conn.createStatement(); ResultSet res = stmt.executeQuery("select * from course"); while (res.next()) System.out.println(res.getString(1)+" "+res.getString(2)); conn.close(); stmt.close(); } catch (SQLException e) { e.printStackTrace(); } } @Test public void testJdbcUtils() throws ClassNotFoundException { System.out.println(JdbcUtils.getInstance().getCourseName("128")); System.out.println(JdbcUtils.getInstance().getCourseName("112")); } @Test public void testHbase() { MapJdbcUtilsclickCount=HbaseUtils.getInstance().getClickCount("ns1:courses_clickcount", "20211001"); for (String x : clickCount.keySet()) System.out.println(x + " " +clickCount.get(x)); } }
package com.test.utils; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.util.HashMap; import java.util.Map; public class JdbcUtils { private Connection connection = null; private static JdbcUtils jdbcUtils = null; Statement stmt = null; private JdbcUtils() throws ClassNotFoundException { Class.forName("com.mysql.jdbc.Driver"); String url = "jdbc:mysql://localhost:3306/spark?useSSL=false"; String username = "root"; String password = "root"; try { connection = DriverManager.getConnection(url, username, password); stmt = connection.createStatement(); }catch (Exception e){ e.printStackTrace(); } } public static synchronized JdbcUtils getInstance() throws ClassNotFoundException { if(jdbcUtils == null){ jdbcUtils = new JdbcUtils(); } return jdbcUtils; } public String getCourseName(String id){ try { ResultSet res = stmt.executeQuery("select * from course where id ='" + id + "'"); while (res.next()) return res.getString(2); }catch (Exception e){ e.printStackTrace(); } return null; } public MapHbaseUtilsgetClickCount(String tableName, String date){ Map map = new HashMap (); try { }catch (Exception e){ e.printStackTrace(); return null; } return map; } }
注意修改zookeeper服务器地址中的locahost为你的用户名
package com.test.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.util.Bytes; import java.util.HashMap; import java.util.Map; public class HbaseUtils { private Configuration configuration = null; private Connection connection = null; private static HbaseUtils hbaseUtil = null; private HbaseUtils(){ try { configuration = new Configuration(); //zookeeper服务器的地址 configuration.set("hbase.zookeeper.quorum","localhost:2181"); connection = ConnectionFactory.createConnection(configuration); }catch (Exception e){ e.printStackTrace(); } } public static synchronized HbaseUtils getInstance(){ if(hbaseUtil == null){ hbaseUtil = new HbaseUtils(); } return hbaseUtil; } public HTable getTable(String tableName){ try { HTable table = null; table = (HTable)connection.getTable(TableName.valueOf(tableName)); return table; }catch (Exception e){ e.printStackTrace(); } return null; } public MapMysql数据getClickCount(String tableName, String date){ Map map = new HashMap (); try { //得到表实例 HTable table = getInstance().getTable(tableName); //列族 String cf = "info"; //列 String qualifier = "click_count"; //定义扫描器前缀过滤器,只扫描给定日期的row Filter filter = new PrefixFilter(Bytes.toBytes(date)); //定义扫描器 Scan scan = new Scan(); scan.setFilter(filter); ResultScanner results = table.getScanner(scan); for(Result result:results){ //取出rowKey String rowKey = Bytes.toString(result.getRow()); //取出点击次数 Long clickCount = Bytes.toLong(result.getValue(cf.getBytes(),qualifier.getBytes())); map.put(rowKey,clickCount); } }catch (Exception e){ e.printStackTrace(); return null; } return map; } }
使用shell命令登录mysql后执行
use spark; DROP TABLE IF EXISTS `course`; CREATE TABLE `course` ( `id` int NOT NULL, `course` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic; INSERT INTO `course` VALUES (112, 'Spark'); INSERT INTO `course` VALUES (127, 'Hbase'); INSERT INTO `course` VALUES (128, 'Flink'); INSERT INTO `course` VALUES (130, 'Hadoop'); INSERT INTO `course` VALUES (145, 'Linux'); INSERT INTO `course` VALUES (146, 'Python');
后面的教程没了,自己想办法做吧
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)