大数据采集与分析

大数据采集与分析,第1张

大数据采集与分析

先将软件都解压

看项目需求

启动集群

进入hive

创建库
create database data;

使用库
use data;

执行命令:查看那个是活跃状态

hdfs haadmin -getServiceState nn1
hdfs haadmin -getServiceState nn2

使用SQLyog链接活跃的Hadoop

导入表

使用脚本创建表(mk_hdfsDir.sh)

#!/bin/bash

beg_date=`date -d "${1}" +%s`
end_date=`date -d "${2}" +%s`

if((beg_date >${end_date}));then
 echo "beg_date < end_date"
 exit 0;
fi

currentDate=""
for((i=${beg_date};i<=${end_date};i=i+86400))
do
 currentDate=`date -d @${i} +%Y%m%d`
 echo "-----create /${currentDate}-----"
#这个位置修改库名,表明和字段
 hive -e "use test;create table order_status_log${currentDate}(id string,order_id string,order_status string,operate_time string) row format delimited fields terminated by ','; "
 #hadoop fs -mkdir /${currentDate}
done

创建一个文本(json.date),用来存储时间

使用脚本自动采集(auto_mysql_hive.sh)

#!/bin/bash

#1.获取到json.data中的日期
datax_json_date=`cat /root/datax/job/json.date`

#2.将日期往后推一天
datax_json_date_timestamp=$[`date -d "${datax_json_date}" +%s`+86400]
afterday=`date -d @${datax_json_date_timestamp} +%Y%m%d`

log_dir=/root/log
log_prefix=order_status
#date=`cat /root/datax/job/json.date`
function write_log(){
 #定义输出日志格式,需要在前面加上一个日期
 log_format=`date "+%Y-%m-%d %H:%M:%S"`
 #开始输出日志内容
 echo "${log_format} $1 >> ${2}" >>${log_dir}/${log_prefix}.`date "+%Y-%m-%d"`.log
 #如果接受的第一个参数是ERROR,直接退出
 [ $1 == "ERROR" ] && exit 1
}

echo ${afterday} ${datax_json_date_timestamp} ${datax_json_date}

#3.将json中的日期全局替换成下一天    
#这个位置需要修改成表明
sed -i "s/order_status_log${datax_json_date}/order_status_log${afterday}/g" /root/datax/job/mysql_hive.json

#修改mysql_hive.json日期返回结果
write_log INFO "mysql_hive.json文件日期由${datax_json_date}成功替换成${afterday}!!!"

#4.修改json.data文本日期,需要和darax的json日期时刻保持一致
#echo ${afterday} >  /root/datax/job/json.date
sed -i "s/${datax_json_date}/${afterday}/g" /root/datax/job/json.date

#5.执行
python /root/datax/bin/datax.py /root/datax/job/mysql_hive.json

if [ $? -eq 0 ];then
 result=`hive -e "use test;select count(id) from order_status_log${afterday}"`
 write_log INFO "数据从mysql中采集成功!!!采集数据有${result}条"
else
 write_log ERROR "数据采集失败!!!请立即检查"
fi

在docker中创建MySQL容器

docker run -itd --name=mysql-test -p 8888:3306 -e MYSQL_ROOT_PASSWORD=123456

docker ps -a

docker exec -it mysql-test /bin/bash

service mysql start

mysql -uroot -p

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' WITH GRANT OPTION;

FLUSH PRIVILEGES;

exit;

创建分区表(修改表名字段)

create table order_status_log(id string,order_id string,order_status string,operate_time string)
partitioned by (day string) row format delimited fields terminated by ',';

使用脚本自动采集并将每天数据存到数据库中

#!/bin/bash

#输入开始日期
beg_date=`date -d "${1}" +%s`

#输入结束日期
end_date=`date -d "${2}" +%s`

#判断开始日期不能大于结束日期
if((beg_date >${end_date}));then
 echo "beg_date < end_date"
 exit 0;
fi

currentDate=""

#从开始日期+1天,循环到结束日期
for((i=${beg_date};i<=${end_date};i=i+86400))
do
  currentDate=`date -d @${i} +%Y%m%d`
  echo "-----create /${currentDate}-----"

#使用hive 命令
#进行动态分区
#将order_status_log${currentDate}插入到order_status_log分区表中,以partition(day)区分数据
  hive -e "
  use test;
  set hive.exec.dynamic.partition.mode=nostrict;
  set hive.exec.dynamic.partition=true;
  set hive.exec.max.dynamic.partitions=1000;
  insert into table order_status_log partition(day) select *,date(operate_time) from order_status_log${currentDate};"
done

#将导入分区表数据纪录,存到/root/sh/hive_result.txt文件中
hive -S -e "use test;select day,count(id) from order_status_log group by day;" > /root/sh/hive_result.txt

#设置数据库内容:用户名,密码,IP地址
user="root"
password="123456"
host="192.168.174.9"
prot="8888"

#登录到数据库
mysql_conn="mysql -h"$host" -u"$user" -p"$password""

#查看/root/sh/hive_result.txt文件内容,将内容分别赋给hive_date,hive_result
cat /root/sh/hive_result.txt | while read hive_date hive_result
do
#将hive_date,hive_result两个变量的值传到mysql数据库的test.hive_count表中
  $mysql_conn -e "INSERT INTO test.hive_count  VALUES('$hive_date','$hive_result')"
done







欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5653040.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存