目录
0 环境准备:
1 软件环境的搭建
1.1 jdk的安装
1.2 scala的安装
1.3 hadoop的安装
1.4 KAFKA的安装
1.5 elastocsearch的安装
1.6 redis的安装
1.7 flink的安装
1.8 安装mysql
2 数据介绍
2.1 平台功能
2.2 数据的流程描述
2.3 数据组成
2.3.1 事实数据
2.3.1.1 旅游订单数据
2.3.1.2 用户行为数据
2.3.1.3维度数据
3 造数
3.1 资源准备
3.1.0pom.xml
3.1.1 其他资源
3.2 开发
0 环境准备:
新建虚拟机->设置静态网络->修改映射配置
新建虚拟机 参考
面向大数据开发的集群之虚拟机搭建(一)_林柚晞的博客-CSDN博客
虚拟机网络+设置静态网+修改映射配置+免密登录 参考
项目0单节点的虚拟机做大数据开发(四万字全)_林柚晞的博客-CSDN博客
创建项目路径
mkdir -p /opt/apps
mkdir -p /opt/software
mkdir -p /opt/scritps
apps 是安装软件路径的路径
software是把安装包上传到虚拟机的文件
scripts是存放一些可以自动化安装软件的脚本
1 软件环境的搭建先说明一下:就是把安装包扔到指定的位置是使用远程工程工具的。这边省略了把安装包上传到服务器。
自动化安装脚本的总结
1.1 jdk的安装-设置安装路径和安装包的位置,配置环境变量的位置
-判断安装路径的前驱路径是否存在
-cat追加文件路径到文件里面(比如修改环境变量,覆盖配置文件)
-正式安装步骤,解压缩等
安装包:
链接:https://pan.baidu.com/s/1R5LaDIasYh4vpY_RUB_Qhw?pwd=tff2
提取码:tff2
--来自百度网盘超级会员V2的分享
cd /opt/scripts/
vi install_jdk.sh
#!/bin/bash
# author : XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装jdk
# 约定 > 配置 > 编码INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
JDK_TAR=/opt/software/jdk-8u45-linux-x64.tar.gz
ETC_PROFILE=/etc/profile# 提示使用方法的函数
usage() {
echo "请将jdk-8u45-linux-x64.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${JDK_TAR} ]; then
usage
exit 0
fi# 已经安装过了
if [ -e ${JAVA_DIR} ]; then
echo "${JAVA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
exit 0
fi# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
mkdir -p ${INSTALL_PREFIX}
echo "初始化目录:${INSTALL_PREFIX}"
fiif [ ! -e ${JAVA_DIR} ]; then
mkdir -p ${JAVA_DIR}
echo "初始化目录:${JAVA_DIR}"
fi## 解压JDK的tar包
tar -zxvf ${JDK_TAR} -C ${INSTALL_PREFIX}## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export JAVA_HOME=${JAVA_DIR}
export CLASS_PATH=.:${JAVA_DIR}/lib/dt.jar:${JAVA_DIR}/lib/tool.jar
export PATH=$PATH:${JAVA_DIR}/bin
EOF## 提示成功
echo "install jdk successful!!!"
1.2 scala的安装chmod +x ./install_jdk.sh
./install_jdk.sh
source /etc/profile
java -version
安装包:
链接:https://pan.baidu.com/s/1oBbplZTIem1K0g4Wv_PD5Q?pwd=dzcb
提取码:dzcb
--来自百度网盘超级会员V2的分享
vi /opt/scripts/install_scala.sh
#!/bin/bash
# author : XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装scala
# 约定 > 配置 > 编码INSTALL_PREFIX=/opt/apps
SCALA_DIR=${INSTALL_PREFIX}/scala-2.11.8
SCALA_TAR=/opt/software/scala-2.11.8.tgz
ETC_PROFILE=/etc/profile# 提示使用方法的函数
usage() {
echo "请将scala-2.11.8.tgz上传到/opt/software处然后再执行此脚本!!!"
}# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${SCALA_TAR} ]; then
usage
exit 0
fi# 已经安装过了
if [ -e ${SCALA_DIR} ]; then
echo "${SCALA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
exit 0
fi# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
mkdir -p ${INSTALL_PREFIX}
echo "初始化目录:${INSTALL_PREFIX}"
fiif [ ! -e ${SCALA_DIR} ]; then
mkdir -p ${SCALA_DIR}
echo "初始化目录:${SCALA_DIR}"
fi## 解压JDK的tar包
tar -zxvf ${SCALA_TAR} -C ${INSTALL_PREFIX}## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export SCALA_HOME=${SCALA_DIR}
export PATH=$PATH:${SCALA_DIR}/bin
EOF## 提示成功
echo "install SCALA successful!!!"
1.3 hadoop的安装chmod +x ./install_scala.sh
./install_scala.sh
source /etc/profile
scala -version
安装包:
链接:https://pan.baidu.com/s/1O-E4B1MUZjA90rsvEi1nwQ?pwd=q0ey
提取码:q0ey
--来自百度网盘超级会员V2的分享
vi /opt/scripts/install_hadoop.sh
##1. install_hadoop.sh
#!/bin/bash# author : lixi
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装hadoop
# 约定 > 配置 > 编码INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
HADOOP_DIR=${INSTALL_PREFIX}/hadoop-2.8.1
HADOOP_TAR=/opt/software/hadoop-2.8.1.tar.gz
ETC_PROFILE=/etc/profile# 提示使用方法的函数
usage() {
echo "请将hadoop-2.8.1.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${HADOOP_TAR} ]; then
usage
exit 0
fi# 已经安装过了
if [ -e ${HADOOP_DIR} ]; then
echo "${HADOOP_DIR}路径已经存在,Hadoop已经安装过了,无需再次安装!!!"
exit 0
fi# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
mkdir -p ${INSTALL_PREFIX}
echo "初始化目录:${INSTALL_PREFIX}"
fiif [ ! -e ${HADOOP_DIR} ]; then
mkdir -p ${HADOOP_DIR}
echo "初始化目录:${HADOOP_DIR}"
fi## 解压JDK的tar包
tar -zxvf ${HADOOP_TAR} -C ${INSTALL_PREFIX}## 配置Hadoop
## hadoop-env.sh## core-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/core-site.xml
fs.defaultFS
hdfs://192.168.10.101:9000
hadoop.tmp.dir
${HADOOP_DIR}/hdpdata
hadoop.proxyuser.root.hosts
*
hadoop.proxyuser.root.groups
*
EOF## hdfs-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/hdfs-site.xml
fs.replication
1
dfs.http.address
qianfeng01:50070
dfs.secondary.http.address
qianfeng01:50090
dfs.namenode.name.dir
${HADOOP_DIR}/hdpdata/dfs/name
dfs.datanode.data.dir
${HADOOP_DIR}/hdpdata/dfs/data
dfs.checkpoint.dir
${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
dfs.checkpoint.edits.dir
${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
dfs.permissions
false
EOF
## yarn-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/yarn-site.xml
yarn.nodemanager.aux-services
mapreduce_shuffle
yarn.resourcemanager.hostname
qianfeng01
yarn.resourcemanager.address
qianfeng01:8032
yarn.resourcemanager.scheduler.address
qianfeng01:8030
EOF
## mapred-site.xml
mv ${HADOOP_DIR}/etc/hadoop/mapred-site.xml.template ${HADOOP_DIR}/etc/hadoop/mapred-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/mapred-site.xml
mapreduce.framework.name
yarn
mapreduce.jobhistory.address
qianfeng01:10020
mapreduce.jobhistory.webapp.address
qianfeng01:19888
EOF## slaves
cat << EOF > ${HADOOP_DIR}/etc/hadoop/slaves
qianfeng01
EOF## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export HADOOP_HOME=${HADOOP_DIR}
export PATH=$PATH:${HADOOP_DIR}/bin:${HADOOP_DIR}/sbin
EOF## 格式化
${HADOOP_DIR}/bin/hdfs namenode -format## 提示成功
echo "install hadoop successful!!!"
chmod +x ./install_hadoop.sh
./install_hadoop.sh
source /etc/profile
hadoop version
cd /opt/apps/hadoop-2.8.1/etc/hadoop/hadoop-env.sh
就是找到JAVA_HOME
JAVA_HOME=/opt/apps/jdk1.8.0_45
1.4 KAFKA的安装start-all.sh
安装包:链接:https://pan.baidu.com/s/1abH6Nn4mD9MR7v5r_dJvBg?pwd=qfz8
提取码:qfz8
--来自百度网盘超级会员V2的分享
tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/
vi /etc/profile
export KAFKA_HOME=/opt/apps/kafka_2.11-1.1.1
export PATH=$PATH:$KAFKA_HOME/bin
vi /opt/apps/kafka_2.11-1.1.1/config/server.properties
下面是需要修改的地方
broker.id=1
advertised.listeners=PLAINTEXT://192.168.10.101:9092
log.dirs=/opt/apps/kafka_2.11-1.1.1/log/kafka-logs
zookeeper.connect=qianfeng01:2181/kafka
vi /opt/apps/kafka_2.11-1.1.1/config/producer.properties
bootstrap.servers=qianfeng01:9092
vi /opt/apps/kafka_2.11-1.1.1/config/consumer.properties
bootstrap.servers=qianfeng01:9092
vi /opt/apps/kafka_2.11-1.1.1/config/zookeeper.properties
dataDir=/opt/apps/kafka_2.11-1.1.1/zkData
[root@hadoop kafka_2.11-1.1.1]# zookeeper-server-start.sh -daemon config/zookeeper.properties
zookeeper-shell.sh qianfeng01:2181
[root@hadoop kafka_2.11-1.1.1]# kafka-server-start.sh -daemon config/server.properties
jps
kafka-topics.sh --create --zookeeper qianfeng01:2181/kafka --replication-factor 1 --partitions 2 --topic test
1.5 elastocsearch的安装kafka-topics.sh --list --zookeeper qianfeng01:2181/kafka
安装包:
链接:https://pan.baidu.com/s/1h3C3kneHlLuZxXg8KKzK2g?pwd=y35e
提取码:y35e
--来自百度网盘超级会员V2的分享
useradd hadoop
passwd hadoop
使用root账号
[root@hadoop software]# tar -zxvf elasticsearch-6.5.3.tar.gz -C /opt/apps/
[root@hadoop elasticsearch-6.5.3]# vi /etc/profile
export ELASTICSEARCH_HOME=/opt/apps/elasticsearch-6.5.3
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin
配置
cd /opt/apps/elasticsearch-6.5.3/config/elasticsearch.yml
就是我电脑主机号是qianfeng01(其实我也很想改,但是电脑映射习惯了,改了映射没用了。为了妥协只能一直用了) 涉及到qianfeng01大家就改成自己的主机号哦
cluster.name: qianfeng01
node.name: qianfeng01
node.master: true
node.data: truepath.data: /opt/apps/elasticsearch-6.5.3/data
path.logs: /opt/apps/elasticsearch-6.5.3/logs
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["qianfeng01"]
给普通用户授权
[root@hadoop apps]# ll
total 16
drwxr-xr-x 8 root root 4096 Dec 7 2018 elasticsearch-6.5.3
drwxrwxr-x 11 500 500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x 8 10 143 4096 Apr 11 2015 jdk1.8.0_45
drwxr-xr-x 9 root root 4096 Apr 24 14:32 kafka_2.11-1.1.1
[root@hadoop apps]# chown -R hadoop:hadoop elasticsearch-6.5.3/
[root@hadoop apps]# ls
elasticsearch-6.5.3 hadoop-2.8.1 jdk1.8.0_45 kafka_2.11-1.1.1
[root@hadoop apps]# ll
total 16
drwxr-xr-x 8 hadoop hadoop 4096 Dec 7 2018 elasticsearch-6.5.3
drwxrwxr-x 11 500 500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x 8 10 143 4096 Apr 11 2015 jdk1.8.0_45
drwxr-xr-x 9 root root 4096 Apr 24 14:32 kafka_2.11-1.1.1##5.2 给与普通用户root借用权限
[root@hadoop apps]# vi /etc/sudoers
## Allow root to run any commands anywhere
root ALL=(ALL) ALL
hadoop ALL=(ALL) ALL
切换普通用户
su hadoop
elasticsearch
我遇到的问题啊
、max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]_林柚晞的博客-CSDN博客
第二次安装还遇到了一个问题
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
修改方法
切换root用户
sysctl -w vm.max_map_count=262144
查看
sysctl -a|grep vm.max_map_count
如果显示
vm.max_map_count = 262144
就是成功了
就把账号切换成root,然后reboot一下系统
系统启动之后
su hadoop
elasticsearch -d
查看是否启动成功
jps
curl -XGET qianfeng01:9200
就是es报错如何修改
就是切换到普通用户,直接敲elasticsearch
如果是没有报错卡住了就是前台进程,就顺利了
如果要访问web端,就必须把虚拟机中的防火墙关闭
我们再搞一搞插件工具
安装包:
链接:https://pan.baidu.com/s/1-3eWVFmar03xc2RPz-rwRQ?pwd=dklo
提取码:dklo
--来自百度网盘超级会员V2的分享
这个安装包随便解压到一个文件夹下面,只要自己能找到就可以。
打开谷歌浏览器(好久没用过谷歌浏览器了,主要是不能下载插件了,所以浏览器过于简陋,连个常用表单都没有)
关掉上面的d窗
打开谷歌浏览器的时候,看见搜索框旁边的小插件图标就可以看见es的快捷访问啦
就是我这边还有个bug没搞好,集群没连上
出现上面那个集群健康值未连接,是因为服务器里面的防火墙开了。
systemctl stop firewalld.service
查看状态
systemctl status firewalld.service
看一下web端
1.6 redis的安装安装包:
链接:https://pan.baidu.com/s/1VoBQEeBBovSWQMTAm7T8mw?pwd=teeq
提取码:teeq
--来自百度网盘超级会员V2的分享
[root@hadoop software]# tar -zxvf redis-4.0.11.tar.gz -C /opt/apps/
[root@hadoop redis-4.0.11]# yum -y install gcc-c++
[root@hadoop redis-4.0.11]# make
[root@hadoop redis-4.0.11]# make PREFIX=/opt/apps/redis-4.0.11 install
[root@hadoop redis-4.0.11]# mkdir etc
[root@hadoop redis-4.0.11]# cp redis.conf etc/
[root@hadoop redis-4.0.11]# cd bin/
[root@hadoop bin]# cp redis-benchmark redis-server redis-cli /usr/bin/
搞配置
vi /opt/apps/redis-4.0.11/etc/redis.conf
注意都是解除注释,而不是全删啊。
daemonize yes
loglevel verboserequirepass root
bind 192.168.10.101
就是如果 requirepass 修改会连不到redis服务器,那就注释掉密码试一试。密码我都不想配置了,实在麻烦
[root@hadoop ~]# redis-server /opt/apps/redis-4.0.11/etc/redis.conf
[root@hadoop ~]# ps -ef | grep redis
root 28066 1 0 16:07 ? 00:00:00 redis-server 192.168.10.101:6379
[root@hadoop ~]# redis-cli -h 192.168.10.101
192.168.10.101:6379> SHUTDOWN
(error) NOAUTH Authentication required.
192.168.10.101:6379> AUTH root
OK
192.168.10.101:6379> SHUTDOWN
not connected> exit
[root@hadoop ~]# ps -ef | grep redis
root 28835 1936 0 16:09 pts/0 00:00:00 grep --color=auto redis
redis的开机脚本
[root@hadoop redis-4.0.11]# vi redis
#!/bin/bash
#chkconfig: 2345 80 90
PATH=/usr/bin:/usr/local/bin:/sbin:/bin
REDIS_PORT=6379
EXEC=/opt/apps/redis-4.0.11/bin/redis-server
REDIS_CLI=/opt/apps/redis-4.0.11/bin/redis-cliPIDFILE=/var/run/redis.pid
CONF=/opt/apps/redis-4.0.11/etc/redis.confcase "$1" in
start)
if [ -f ${PIDFILE} ];then
echo "${PIDFILE} exists, process is already running or crashed"
else
echo "Starting Redis Server ...."
${EXEC} ${CONF}
fi
if [ "$?" = "0" ];then
echo "Redis is running"
fi
;;
stop)
if [ ! -e ${PIDFILE} ];then
echo "${PIDFILE} does not exists, process is not running"
else
PID=$(cat ${PIDFILE})
echo "Stopping..."
${REDIS_CLI} -p ${REDIS_PORT} AUTH root
${REDIS_CLI} -p ${REDIS_PORT} SHUTDOWN
while [ -x ${PIDFILE} ]
do
echo "Waiting for Redis shutdown..."
sleep 1
done
echo "Redis stoped"
fi
;;
restart|force-reload)
${0} stop
${0} start
;;
*)
echo "Usage: /etc/init.d/redis {start|stop|restart|force-reload}" >&2
exit 1
esac
1.7 flink的安装[root@hadoop redis-4.0.11]# cp redis /etc/init.d/
[root@hadoop redis-4.0.11]# chmod +x /etc/init.d/redis
##3. 查看自启动的所有程序
[root@hadoop redis-4.0.11]# chkconfig --list
##4. 将redis添加到自启动列表
[root@hadoop redis-4.0.11]# chkconfig --add redis
[root@hadoop redis-4.0.11]# chkconfig --level 2345 redis on##5. 启动关闭测试
[root@hadoop redis-4.0.11]# systemctl start redis
[root@hadoop redis-4.0.11]# service redis start
安装包:
链接:https://pan.baidu.com/s/1d6tMAwfLuC47TgAsClnY3w?pwd=k1h9
提取码:k1h9
--来自百度网盘超级会员V2的分享
安装standalone
##1. 解压
[root@hadoop software]# tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /opt/apps/##2. 配置环境变量
[root@hadoop flink-1.9.1]# vi /etc/profile
export FLINK_HOME=/opt/apps/flink-1.9.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin:$FLINK_HOME/bin[root@hadoop flink-1.9.1]# source /etc/profile
配置
[root@hadoop flink-1.9.1]# vi conf/flink-conf.yaml
jobmanager.rpc.address: qianfeng01
jobmanager.rpc.port: 6123
rest.port: 8081
rest.address: qianfeng01
##4. 配置从机文件
[root@hadoop flink-1.9.1]# vi conf/slaves
qianfeng01
##5. 配置主机文件
[root@hadoop flink-1.9.1]# vi conf/masters
qianfeng01:8081
##6. 先启动hdfs
[root@hadoop flink-1.9.1]# start-dfs.sh
[root@hadoop flink-1.9.1]# start-cluster.sh
web端访问:
http://192.168.10.101:8081/
测试,可以不测试
1.8 安装mysql##8. 测试Flink
##8.1 A窗口
[root@hadoop flink-1.9.1]# yum -y install nc
[root@hadoop flink-1.9.1]# nc -l 8000111
222
33
444
555
33
111
222##8.2 B窗口
[root@hadoop flink-1.9.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 8000[root@hadoop flink-1.9.1]# tail -f log/flink-root-taskexecutor-0-hadoop.out
111 : 1
555 : 1
444 : 1
33 : 1
222 : 1
33 : 1
222 : 1
111 : 1##9. 提交jar
[root@hadoop flink-1.9.1]# flink run examples/batch/WordCount.jar --input /etc/profile --output /home/output/00
Starting execution of program
Program execution finished
Job with JobID ca414928ddcfd57969326827f5cb7205 has finished.
Job Runtime: 21670 ms
[root@hadoop flink-1.9.1]# tail -f /home/output/00
want 1
we 1
what 1
wide 1
will 1
workaround 1
x 1
you 3
your 1
z 1
安装包:
链接:https://pan.baidu.com/s/1qi2i9r7loMj3Y04LsMTc6w?pwd=4u1e
提取码:4u1e
--来自百度网盘超级会员V2的分享
2 数据介绍 2.1 平台功能##1. 安装引导
[root@hadoop software]# yum -y localinstall mysql-community-release-el6-5.noarch.rpm
##2. 安装mysql
[root@hadoop software]# yum -y install mysql-server
##3. 启动mysql的服务
[root@hadoop software]# service mysqld start
##4. 设置初始root账户密码
[root@hadoop software]# mysqladmin -uroot password '123456'
##5. 登陆mysql服务
[root@hadoop software]# mysql -uroot -p123456
##6. 远程授权
mysql> grant all privileges on *.* to root@"%" identified by "123456" with grant option;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
##7. 就是可以使用mysql的client去远程链接mysql的服务了
功能模块 | 说明 |
---|---|
核心业务 | 基于核心业务数据(包括旅游订单、酒店住宿、车票业务)的相关实时计算、实时展示 |
用户行为日志 | 基于产品设计的各种埋点 |
风控报警 | 用户异常行为进行监控和报警 |
本项目以大数据实时平台为目标
技术方向 | 说明 |
---|---|
实时的ETL | 实时的清洗、转换为规范的格式化的数据 |
实时的数据统计 | 实现各种实时的统计指标 |
实时的数据存储 | 实时数据落地持久化、交互搜索、动态计算提供技术支持 |
规则处理 | 主要是服务实时风控、报警相关需求 |
交互式查询 | 交互式查询明细数据或实时的聚合数据(Clickhouse、Apache Druid) |
实时数据展示 | 主要是服务于数据使用方,提供更直观的数据展示形式 |
2.3 数据组成 2.3.1 事实数据 2.3.1.1 旅游订单数据实时的旅游平台,说明数据一定会经过消息中间件(消息队列/消息通道)。原始的数据源通常都是通过消息通道来完成数据的采集。随后根据实际情况将消息通道中的数据存储在各种不同的分布式存储介质中。最终统计存储在分布式介质中的数据,也可以做成BI类型可视化展示。
{
"userID":"1986",
"user_mobile":"15549477595",
"product_id":"12312312",
"product_traffic":"01",
"product_traffic_grade":"11",
"product_traffic_type" : "1",
"product_pub" : "sdfsfsdfsdff | sdfsdfsdfds",
"user_region" : "12312",
"travel_memeber_adult" : "2",
...
}
2.3.1.2 用户行为数据userID = 用户唯一编号
user_mobile = 用户手机号码
product_id = 旅游这个产品编号
product_traffic = 旅游的交通选择
product_traffic_grade = 坐席
product_traffic_type = 行程种类
product_pub = 旅游住宿选择
user_region = 所在区域
travel_memeber_adult = 本次旅游的成年人的人数
travel_memeber_younger = 本次旅游的儿童的人数
travel_member_baby = 本次旅游的婴儿的人数
product_price = 产品原价格
has_activity = 0表示五活动价格 | 0.8表示打八折
product_fee = 产品价格
order_id = 旅游订单
order_ct = 下单时间
行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType: 'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击) '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
*** 作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct
2.3.1.3维度数据{
"os":"1",
"longitude":"115.12321",
"latitude":"26.88282",
"userRegion":"12312",
"userID":"12312",
"manufacturer":"09",
"userRegionIP":"10.206.0.4",
"action":"08",
"eventType":"01"
...
}
旅游产品维度表
CREATE TABLE travel.dim_product(
product_id text NULL COMMENT '旅游产品的编号',
product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
product_type text NULL COMMENT '旅游产品的类型',
departure_code text NULL COMMENT '旅游产品的出发地编码',
des_city_code text NULL COMMENT '旅游产品的目的地编码',
tourim_ticket_type text NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
酒店维度表
CREATE TABLE travel.dim_pub(
pub_id text NULL COMMENT '酒店的编号',
pub_name text NULL COMMENT '酒店的名称',
pub_stat text NULL COMMENT '酒店的星级',
pub_grade text NULL COMMENT '酒店的等级编码',
pub_grade_desc text NULL COMMENT '酒店的等级编码描述',
pub_area_code text NULL COMMENT '酒店所在区域编码',
pub_address text NULL COMMENT '酒店地址',
is_national text NULL COMMENT '是否属于境内',
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
导入维度数据到mysql
sql文件(因为文件比较大,然后需要自己下载到本地,使用navicat远程连接服务器上的mysql)
链接:https://pan.baidu.com/s/1dfCxtrmgdCNnFeCfM_cDdQ?pwd=btl3
提取码:btl3
--来自百度网盘超级会员V2的分享
打开一下服务器
打开windows上面的navicat
[root@qianfeng01 ~]# mysql -uroot -p123456
下面就开始api开发吧
3 造数打开idea,创建一个maven项目
3.1 资源准备 3.1.0pom.xml3.1.1 其他资源4.0.0 com.qf.bigdata qfdata1.0 UTF-8 UTF-8 1.8 1.1.8 1.7.22 1.2.16 1.2.29 2.4.0 2.8.5 2.9.3 21.0 5.1.44 3.5.1 3.4 2.4 1.6 1.8.1 1.8.12.8.1 0.11 1.1.1 3.0.3 compile org.slf4j slf4j-api${slf4j.version} ${scope} log4j log4j${log4j.version} ${scope} org.slf4j jcl-over-slf4j${slf4j.version} ${scope} ch.qos.logback logback-classic${logback.version} ${scope} ch.qos.logback logback-core${logback.version} ${scope} ch.qos.logback logback-access${logback.version} ${scope} com.google.guava guava${guava.version} ${scope} com.alibaba fastjson${fastjson.version} ${scope} com.google.code.gson gson${gson.version} com.jayway.jsonpath json-path${json-path.version} ${scope} mysql mysql-connector-java${mysql.jdbc.version} ${scope} com.google.protobuf protobuf-java${protobuf.version} ${scope} org.apache.commons commons-csv${commons-csv.version} ${scope} org.apache.commons commons-lang3${commons-lang3.version} ${scope} commons-io commons-io${commons-io.version} ${scope} org.apache.hadoop hadoop-common${hadoop.version} ${scope} protobuf-java com.google.protobuf org.apache.hadoop hadoop-client${hadoop.version} ${scope} jsp-api javax.servlet.jsp: org.slf4j log4j-over-slf4jorg.slf4j slf4j-log4j12org.apache.hadoop hadoop-hdfs${hadoop.version} ${scope} com.101tec zkclient${zk.client.version} ${scope} org.apache.kafka kafka_2.11${kafka.version} ${scope} org.apache.kafka kafka-clients${kafka.version} ${scope} com.esotericsoftware kryo${kryo.version} ${scope} org.apache.avro avro${avro.version} org.apache.parquet parquet-avro${parquet.version} ${scope} org.apache.parquet parquet-common${parquet.version} ${scope} org.apache.parquet parquet-tools${parquet.version} ${scope} org.apache.parquet parquet-hadoop-bundle${parquet.version} ${scope} dev true${project.basedir}/src/main/resources true org.apache.maven.plugins maven-shade-plugintarget/classes target/test-classes org.apache.maven.plugins maven-shade-plugin3.1.0 TravelCurLogJob package shade true TravelCurLogJob *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA reference.conf com.qf.bigdata.realtime.util.data.travel.logs.TravelOrderHelper TravelOrderJob package shade true TravelOrderJob *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA reference.conf com.qf.bigdata.realtime.util.TravelOrderHelper net.alchim31.maven scala-maven-plugin3.2.2 scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.codehaus.mojo build-helper-maven-plugin3.0.0 add-source generate-sources add-source add-test-source generate-sources add-test-source org.apache.maven.plugins maven-compiler-plugin3.6.1 ${java.version} ${file.encoding}
就是把资源上传到项目的resource文件夹里面,里面文件中的ip和端口要根据自己虚拟机端口进行修改
链接:https://pan.baidu.com/s/1sIlLszb-OgeYErCIYWMtnw?pwd=36wl
提取码:36wl
--来自百度网盘超级会员V2的分享
这是我的暂时的项目结构
下面我是做的功能点
在TravelOrderHelper里面对于酒店表、地区表、用户表进行创建字段的静态常量。
CSVUtil类负责读取resource中存在的csv文件的数据,把每条数据封装到Map,最后再封装到一个List中。
对于三张不同的表的 *** 作
地区表:主要是创建了一个RegionDo的javabean,然后把地区的数据进行读取遍历封装到map中,最后又封装到一个List。其他的表同理。
package com.qf.bigdata.realtime.constant; import java.io.Serializable; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; public class CommonConstant implements Serializable { public static final int DEF_NUMBER_ZERO = 0; public static final int DEF_NUMBER_ONE = 1; public static final int DEF_NUMBER_DUL = 2; //用户数量限制级别 public static final Integer USER_COUNT_LEVEL = 5; public static final int DEF_CODE_COUNT = 4; //代码位数 public static final int DEF_RANGER = 10; //范围 //时间格式 public static final DateTimeFormatter PATTERN_YYYYMMDD = DateTimeFormatter.ofPattern("yyyyMMdd"); public static final DateTimeFormatter PATTERN_YYYYMMDD_MID = DateTimeFormatter.ofPattern("yyyy-MM-dd"); public static final DateTimeFormatter PATTERN_HOUR = DateTimeFormatter.ofPattern("HH"); public static final String FORMATTER_YYYYMMDD = "yyyyMMdd"; public static final String FORMATTER_YYYYMMDD_MID = "yyyy-MM-dd"; public static final String FORMATTER_YYYYMMDDHHMMDD = "yyyyMMddHHmmss"; public static final String FORMATTER_YYYYMMDDHHMM = "yyyyMMddHHmm"; public static final String FORMATTER_YYYYMMDDHH = "yyyyMMddHH"; //发送序列化对象 public static final ChronoUnit chronoUnit = ChronoUnit.MINUTES; public static final ChronoUnit dayChronoUnit = ChronoUnit.DAYS; //===charset==================================================================== public static final String CHARSET_UTF8 = "utf-8"; //测试通道 //===kafka-topic==================================================================== public static final String TOPIC_TEST = "t-release"; //投放topic public static final String KAFKA_PRODUCER_JSON_PATH = "kafka/json/kafka-producer.properties"; public static final String KAFKA_CONSUMER_JSON_PATH = "kafka/json/kafka-consumer.properties"; public static final String KAFKA_CONFIG_PATH = "kafka/kafka-config.properties"; //===zk==================================================================== public static final String ZK_CONNECT = "zk.connect"; public static final String ZK_CONNECT_KAFKA = "zk.kafka.connect"; public static final String ZK_SESSION_TIMEOUT = "zk.session.timeout"; public static final String ZK_CONN_TIMEOUT = "zk.connection.timeout"; public static final String ZK_BEE_ROOT = "zk.dw.root"; //===常用符号==================================================================== public static final String Encoding_UTF8 = "UTF-8"; public static final String Encoding_GBK = "GBK"; public static final String MIDDLE_LINE = "-"; public static final String BOTTOM_LINE = "_"; public static final String COMMA = ","; public static final String SEMICOLON = ";"; public static final String PLINE = "|"; public static final String COLON = ":"; public static final String PATH_W = "\\"; public static final String PATH_L = "/"; public static final String POINT = "."; public static final String BLANK = " "; public static final String LEFT_ARROWS = "<-"; public static final String RIGHT_ARROWS = "->"; public static final String LEFT_BRACKET = "["; public static final String RIGHT_BRACKET = "]"; public static final String TAB = "\t"; //===================================================== public static final String KAFKA_DATA_KEY_TOPIC = "topic"; public static final String KEY_TIMESTAMP = "timestamp"; public static final String KEY_KEY = "key"; public static final String KEY_VALUE = "value"; public static final String KEY_OFFSET = "offset"; public static final String KEY_PARTITION = "partition"; public static final String KEY_CTTIME_BEGIN = "ctTimeBegin"; public static final String KEY_CTTIME_END = "ctTimeEnd"; public static final String KEY_CTTIME = "ctTime"; public static final String KEY_USER_CODE = "userCode"; public static final String KEY_ORDER_CODE = "orderCode"; public static final String KEY_VEHICLE_CODE = "vehicleCode"; public static final String KEY_VEHICLE_TYPE = "vehicleType"; public static final String KEY_STATUS = "status"; public static final String KEY_LNG = "longitude"; public static final String KEY_LAT = "latitude"; public static final String KEY_GEOHASH = "geoHash"; public static final String KEY_ADCODE = "adcode"; public static final String KEY_PROVINCE = "province"; public static final String KEY_DISTRICT = "district"; public static final String KEY_TOWNCODE = "towncode"; public static final String KEY_TOWNSHIP = "township"; public static final String KEY_FORMATTED_ADDRESS = "formatted_address"; public static final String KEY_ADDRESS = "address"; public static final String KEY_G_SIGNAL = "gSignal"; //GIS public final static Double LONGITUDE_CHINA_MAX = 135.05; //经度 public final static Double LONGITUDE_CHINA_MIN = 73.66; //经度 public final static Double LATITUDE_CHINA_MAX = 53.55; //纬度 public final static Double LATITUDE_CHINA_MIN = 3.86; //纬度 //多省 public final static Double LONGITUDE_REGION_MAX = 125.30; //经度 public final static Double LONGITUDE_REGION_MIN = 113.25; //经度 public final static Double LATITUDE_REGION_MAX = 34.365791; //纬度 public final static Double LATITUDE_REGION_MIN = 30.038888; //纬度 //北京行政中心的纬度为39.92,经度为116.46 //北京 北纬39”26'至41”03',东经115”25'至117”30' public final static Double LONGITUDE_BJ_MAX = 117.30; //经度 public final static Double LONGITUDE_BJ_MIN = 115.25; //经度 public final static Double LATITUDE_BJ_MAX = 41.03; //纬度 public final static Double LATITUDE_BJ_MIN = 39.26; //纬度 //学校附近 public final static Double LONGITUDE_QF_MIN = 116.300000; //经度 public final static Double LONGITUDE_QF_MAX = 116.399999; //经度 public final static Double LATITUDE_QF_MIN = 40.060000; //纬度 public final static Double LATITUDE_QF_MAX = 40.069999; //纬度 }
package com.qf.bigdata.realtime.dvo; //国家地区 public class RegionDo { public String regionCode ; public String regionCodeDesc ; public String regionCity; public String reginCityDesc; public String regionProvince; public String regionProvinceDesc; public String getRegionCode() { return regionCode; } public String getRegionCodeDesc() { return regionCodeDesc; } public String getRegionCity() { return regionCity; } public String getReginCityDesc() { return reginCityDesc; } public String getRegionProvince() { return regionProvince; } public String getRegionProvinceDesc() { return regionProvinceDesc; } public void setRegionCode(String regionCode) { this.regionCode = regionCode; } public void setRegionCodeDesc(String regionCodeDesc) { this.regionCodeDesc = regionCodeDesc; } public void setRegionCity(String regionCity) { this.regionCity = regionCity; } public void setReginCityDesc(String reginCityDesc) { this.reginCityDesc = reginCityDesc; } public void setRegionProvince(String regionProvince) { this.regionProvince = regionProvince; } public void setRegionProvinceDesc(String regionProvinceDesc) { this.regionProvinceDesc = regionProvinceDesc; } public void setRegionCityDesc(String orDefault) { } }
package com.qf.bigdata.realtime.util; import com.qf.bigdata.realtime.dvo.RegionDo; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; // *** 作csv文件的工具类 public class CSVUtil implements Serializable { private static final Logger log = LoggerFactory.getLogger(CSVUtil.class); public static final String AREA_CODE_CSV_FILE = "areacode/areacode.csv"; public static final String AREA_CODE_GIS_LOCATION_CHINA_CSV_FILE = "areacode/china_gis_location.csv"; public static final String REGION_FILE = "areacode/dim_region.csv"; public static final String PUB_FILE = "areacode/dim_pub.csv"; public static final String PRODUCT_FILE = "areacode/dim_product.csv"; public static final char QUOTE_TAB = '\t'; public static final char QUOTE_COMMON = ','; public static final char PLINE = '|'; public static final char PATH_EQ = '='; public static final String NEW_LINE_SEPARATOR = "\n"; //读取csv文件 public static List
特别说明上面最后几行注释就是测试是否可以读取csv文件,如图运行。
package com.qf.bigdata.realtime.util; import org.apache.commons.lang3.math.NumberUtils; import java.io.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; public class QParameterTool implements Serializable, Cloneable{ private static final long serialVersionUID = 1L; protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY"; protected static final String DEFAULT_UNDEFINED = ""; // ------------------ Constructors ------------------------ /** * Returns {@link QParameterTool} for the given arguments. The arguments are keys followed by values. * Keys have to start with '-' or '--' * * Example arguments: * --key1 value1 --key2 value2 -key3 value3 * * @param args Input array arguments * @return A {@link QParameterTool} */ public static QParameterTool fromArgs(String[] args) { final Map
map = new HashMap<>(args.length / 2); int i = 0; while (i < args.length) { final String key; if (args[i].startsWith("--")) { key = args[i].substring(2); } else if (args[i].startsWith("-")) { key = args[i].substring(1); } else { throw new IllegalArgumentException( String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.", Arrays.toString(args), args[i])); } if (key.isEmpty()) { throw new IllegalArgumentException( "The input " + Arrays.toString(args) + " contains an empty argument"); } i += 1; // try to find the value if (i >= args.length) { map.put(key, NO_VALUE_KEY); } else if (NumberUtils.isNumber(args[i])) { map.put(key, args[i]); i += 1; } else if (args[i].startsWith("--") || args[i].startsWith("-")) { // the argument cannot be a negative number because we checked earlier // -> the next argument is a parameter name map.put(key, NO_VALUE_KEY); } else { map.put(key, args[i]); i += 1; } } return fromMap(map); } /** * Returns {@link QParameterTool} for the given {@link Properties} file. * * @param path Path to the properties file * @return A {@link QParameterTool} * @throws IOException If the file does not exist * @see Properties */ public static QParameterTool fromPropertiesFile(String path) throws IOException { File propertiesFile = new File(path); return fromPropertiesFile(propertiesFile); } /** * Returns {@link QParameterTool} for the given {@link Properties} file. * * @param file File object to the properties file * @return A {@link QParameterTool} * @throws IOException If the file does not exist * @see Properties */ public static QParameterTool fromPropertiesFile(File file) throws IOException { if (!file.exists()) { throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist"); } try (FileInputStream fis = new FileInputStream(file)) { return fromPropertiesFile(fis); } } /** * Returns {@link QParameterTool} for the given InputStream from {@link Properties} file. * * @param inputStream InputStream from the properties file * @return A {@link QParameterTool} * @throws IOException If the file does not exist * @see Properties */ public static QParameterTool fromPropertiesFile(InputStream inputStream) throws IOException { Properties props = new Properties(); props.load(inputStream); return fromMap((Map) props); } /** * Returns {@link QParameterTool} for the given map. * * @param map A map of arguments. Both Key and Value have to be Strings * @return A {@link QParameterTool} */ public static QParameterTool fromMap(Map map) { return new QParameterTool(map); } /** * Returns {@link QParameterTool} from the system properties. * Example on how to pass system properties: * -Dkey1=value1 -Dkey2=value2 * * @return A {@link QParameterTool} */ public static QParameterTool fromSystemProperties() { return fromMap((Map) System.getProperties()); } // ------------------ QParameterTool ------------------------ public Map data = null; // data which is only used on the client and does not need to be transmitted protected transient Map defaultData; protected transient Set unrequestedParameters; private QParameterTool(Map data) { this.data = Collections.unmodifiableMap(new HashMap<>(data)); this.defaultData = new ConcurrentHashMap<>(data.size()); this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); unrequestedParameters.addAll(data.keySet()); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } QParameterTool that = (QParameterTool) o; return Objects.equals(data, that.data) && Objects.equals(defaultData, that.defaultData) && Objects.equals(unrequestedParameters, that.unrequestedParameters); } @Override public int hashCode() { return Objects.hash(data, defaultData, unrequestedParameters); } /** * Returns the set of parameter names which have not been requested with * {@link #has(String)} or one of the {@code get} methods. Access to the * map returned by {@link #toMap()} is not tracked. */ public Set getUnrequestedParameters() { return Collections.unmodifiableSet(unrequestedParameters); } // ------------------ Get data from the util ---------------- /** * Returns number of parameters in {@link QParameterTool}. */ public int getNumberOfParameters() { return data.size(); } /** * Returns the String value for the given key. * If the key does not exist it will return null. */ public String get(String key) { addToDefaults(key, null); unrequestedParameters.remove(key); return data.get(key); } /** * Returns the String value for the given key. * If the key does not exist it will throw a {@link RuntimeException}. */ public String getRequired(String key) { addToDefaults(key, null); String value = get(key); if (value == null) { throw new RuntimeException("No data for required key '" + key + "'"); } return value; } /** * Returns the String value for the given key. * If the key does not exist it will return the given default value. */ public String get(String key, String defaultValue) { addToDefaults(key, defaultValue); String value = get(key); if (value == null) { return defaultValue; } else { return value; } } /** * Check if value is set. */ public boolean has(String value) { addToDefaults(value, null); unrequestedParameters.remove(value); return data.containsKey(value); } // -------------- Integer /** * Returns the Integer value for the given key. * The method fails if the key does not exist or the value is not an Integer. */ public int getInt(String key) { addToDefaults(key, null); String value = getRequired(key); return Integer.parseInt(value); } /** * Returns the Integer value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not an Integer. */ public int getInt(String key, int defaultValue) { addToDefaults(key, Integer.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } return Integer.parseInt(value); } // -------------- LONG /** * Returns the Long value for the given key. * The method fails if the key does not exist. */ public long getLong(String key) { addToDefaults(key, null); String value = getRequired(key); return Long.parseLong(value); } /** * Returns the Long value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Long. */ public long getLong(String key, long defaultValue) { addToDefaults(key, Long.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } return Long.parseLong(value); } // -------------- FLOAT /** * Returns the Float value for the given key. * The method fails if the key does not exist. */ public float getFloat(String key) { addToDefaults(key, null); String value = getRequired(key); return Float.valueOf(value); } /** * Returns the Float value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Float. */ public float getFloat(String key, float defaultValue) { addToDefaults(key, Float.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Float.valueOf(value); } } // -------------- DOUBLE /** * Returns the Double value for the given key. * The method fails if the key does not exist. */ public double getDouble(String key) { addToDefaults(key, null); String value = getRequired(key); return Double.valueOf(value); } /** * Returns the Double value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Double. */ public double getDouble(String key, double defaultValue) { addToDefaults(key, Double.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Double.valueOf(value); } } // -------------- BOOLEAN /** * Returns the Boolean value for the given key. * The method fails if the key does not exist. */ public boolean getBoolean(String key) { addToDefaults(key, null); String value = getRequired(key); return Boolean.valueOf(value); } /** * Returns the Boolean value for the given key. If the key does not exists it will return the default value given. * The method returns whether the string of the value is "true" ignoring cases. */ public boolean getBoolean(String key, boolean defaultValue) { addToDefaults(key, Boolean.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Boolean.valueOf(value); } } // -------------- SHORT /** * Returns the Short value for the given key. * The method fails if the key does not exist. */ public short getShort(String key) { addToDefaults(key, null); String value = getRequired(key); return Short.valueOf(value); } /** * Returns the Short value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Short. */ public short getShort(String key, short defaultValue) { addToDefaults(key, Short.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Short.valueOf(value); } } // -------------- BYTE /** * Returns the Byte value for the given key. * The method fails if the key does not exist. */ public byte getByte(String key) { addToDefaults(key, null); String value = getRequired(key); return Byte.valueOf(value); } /** * Returns the Byte value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Byte. */ public byte getByte(String key, byte defaultValue) { addToDefaults(key, Byte.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Byte.valueOf(value); } } // --------------- Internals protected void addToDefaults(String key, String value) { final String currentValue = defaultData.get(key); if (currentValue == null) { if (value == null) { value = DEFAULT_UNDEFINED; } defaultData.put(key, value); } else { // there is already an entry for this key. Check if the value is the undefined if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) { // update key with better default value defaultData.put(key, value); } } } // ------------------------- Export to different targets ------------------------- /** * @return A {@link Properties} */ public Properties getProperties() { Properties props = new Properties(); props.putAll(this.data); return props; } /** * Create a properties file with all the known parameters (call after the last get*() call). * Set the default value, if available. * * Use this method to create a properties file skeleton. * * @param pathToFile Location of the default properties file. */ public void createPropertiesFile(String pathToFile) throws IOException { createPropertiesFile(pathToFile, true); } /** * Create a properties file with all the known parameters (call after the last get*() call). * Set the default value, if overwrite is true. * * @param pathToFile Location of the default properties file. * @param overwrite Boolean flag indicating whether or not to overwrite the file * @throws IOException If overwrite is not allowed and the file exists */ public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException { final File file = new File(pathToFile); if (file.exists()) { if (overwrite) { file.delete(); } else { throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed"); } } final Properties defaultProps = new Properties(); defaultProps.putAll(this.defaultData); try (final OutputStream out = new FileOutputStream(file)) { defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()"); } } protected Object clone() throws CloneNotSupportedException { return new QParameterTool(this.data); } // ------------------------- Interaction with other ParameterUtils ------------------------- /** * Merges two {@link QParameterTool}. * * @param other Other {@link QParameterTool} object * @return The Merged {@link QParameterTool} */ public QParameterTool mergeWith(QParameterTool other) { final Map
resultData = new HashMap<>(data.size() + other.data.size()); resultData.putAll(data); resultData.putAll(other.data); final QParameterTool ret = new QParameterTool(resultData); final HashSet requestedParametersLeft = new HashSet<>(data.keySet()); requestedParametersLeft.removeAll(unrequestedParameters); final HashSet requestedParametersRight = new HashSet<>(other.data.keySet()); requestedParametersRight.removeAll(other.unrequestedParameters); ret.unrequestedParameters.removeAll(requestedParametersLeft); ret.unrequestedParameters.removeAll(requestedParametersRight); return ret; } // ------------------------- ExecutionConfig.UserConfig interface ------------------------- public Map toMap() { return data; } // ------------------------- Serialization --------------------------------------------- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); defaultData = new ConcurrentHashMap<>(data.size()); unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); } }
package com.qf.bigdata.realtime.util; import com.qf.bigdata.realtime.constant.CommonConstant; import com.qf.bigdata.realtime.dvo.RegionDo; import com.qf.bigdata.realtime.util.CSVUtil; import com.qf.bigdata.realtime.util.QParameterTool; import com.sun.org.apache.regexp.internal.RE; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.LdapGroupsMapping; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Int; import java.io.Serializable; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class TravelOrderHelper implements Serializable { //日志管理工具 private static final Logger log = LoggerFactory.getLogger(TravelOrderHelper.class); /* *订单数据属性 * 用户唯一编号 */ public static final String KEY_KAFKA_ID = "KAFKA_ID"; public static final String KEY_ORDER_ID = "order_id"; public static final String KEY_USER_ID = "userid"; public static final String KEY_USER_MOBILE = "user_mobile"; public static final String KEY_PRODUCT_ID = "product_id"; public static final String KEY_PRODUCT_TRAFFIC = "product_traffic"; public static final String KEY_PRODUCT_TRAFFIC_GRADE = "product_traffic_grade"; public static final String KEY_PRODUCT_TRAFFIC_TYPE = "product_traffic_type"; public static final String KEY_PRODUCT_PUB = "product_pub"; public static final String KEY_USER_REGION = "user_region"; public static final String KEY_TRAVEL_MEMBER_ADULT = "travel_member_adult"; public static final String KEY_TRAVEL_MEMBER_YOUNGER = "travel_member_younger"; public static final String KEY_TRAVEL_MEMBER_BABY = "travel_member_baby"; public static final String KEY_PRODUCT_PRICE = "product_price"; public static final String KEY_HAS_ACTIVITY = "has_activity"; public static final String KEY_PRODUCT_FEE = "product_fee"; public static final String KEY_ORDER_CT = "order_ct"; // 地区 /* * REGION_KEY_CODE : 区域码 * REGION_KEY_CODE_DESC = 区域码说明 * REGION_KEY_CITY : 区域城市 * REGION_KEY_CITY_DESC : 区域城市描述 * REGION_KEY_PROVINCE : 区域省份 * REGION_KEY_PROVINCE_DESC : 区域省份描述 */ public static final String REGION_KEY_CODE = "region_code"; public static final String REGION_KEY_CODE_DESC = "region_code_desc"; public static final String REGION_KEY_CITY = "region_city"; public static final String REGION_KEY_CITY_DESC = "region_city_desc"; public static final String REGION_KEY_PROVINCE = "region_province"; public static final String REGION_KEY_PROVINCE_DESC = "region_province_desc"; // 地区列表,里面存放了所有的地区 public static final Listregions = new ArrayList<>(); // 酒店 /* * PUB_KEY_ID : '酒店的编号', * PUB_KEY_NAME : '酒店的名称', * PUB_KEY_STAT : '酒店的星级', * PUB_KEY_GRADE : '酒店的等级编码', * PUB_KEY_GRADE_DESC : '酒店的等级编码描述', * PUB_KEY_AREA_CODE : '酒店所在区域编码', * PUB_KEY_ADDRESS : '酒店地址', * PUB_KEY_IS_NATIONAL : '是否属于境内', */ public static final String PUB_KEY_ID = "pub_id"; public static final String PUB_KEY_NAME = "pub_name"; public static final String PUB_KEY_STAT = "pub_stat"; public static final String PUB_KEY_GRADE = "pub_grade"; public static final String PUB_KEY_GRADE_DESC = "pub_grade_desc"; public static final String PUB_KEY_AREA_CODE = "pub_area_code"; public static final String PUB_KEY_ADDRESS = "pub_address"; public static final String PUB_KEY_IS_NATIONAL = "is_national"; // 酒店与旅游产品的映射集合 public static final Map proMappingPub = new HashMap<>(); //产品 public static final String PRODUCT_KEY_ID = "product_id"; public static final String SHOP_KEY_ID = "shop_id"; // 产品的信息列表 public static final List products = new ArrayList<>(); //参数 public static final String KEY_TOPIC = "topic"; public static final String KEY_SOURCE = "source"; public static final String KEY_BEGIN = "begin"; public static final String KEY_END = "end"; public static final String KEY_COUNT = "count"; public static final String KEY_SLEEP = "sleep"; public static final String SOURCE_PRODUCT = "product"; public static final String SOURCE_PAY = "pay"; public static final String TIME_ORDER = "time_order"; public static final Integer TIME_RANGE_MIN = 1; public static final Integer TIME_RANGE_MAX = 120; public static final Integer COUNT_MIN = 1; public static final Integer COUNT_MAX = 10000; public static final Integer SLEEP_MIN = 1; public static final Integer SLEEP_MAX = 3600 * 1000; /* * 造数据(订单数据)的入口 * --topic t_travel_orders * */ public static void main(String[] args) { //校验这个参数是否合法 String checkResult = checkParams(args); //校验结果 if (StringUtils.isEmpty(checkResult)){ //2.1执行if说明参数无问题,效果是连续不断地创作数据,然后将数据生产到指定的kafka主题 chooseFun(args); }else { System.err.println("some erros happends ["+checkResult+"]"); } } //校验参数是否合法 //如果这个方法返回的是空字符串,说明参数没有问题 private static String checkParams(String[] args) { //定义一个返回结果的字符串 String result = ""; //通过加载args参数列表获取参数工具类对象 QParameterTool tools = QParameterTool.fromArgs(args); //校验主题 String topic = tools.get(KEY_TOPIC); if(StringUtils.isEmpty(topic)){ result = "agrs is Empty"; return result; } //其他参数 String source = tools.get(KEY_SOURCE); if(!SOURCE_PRODUCT.equalsIgnoreCase(source) && !SOURCE_PAY.equalsIgnoreCase(source)){ result = "source is error .source must be ['product' or 'pay']"; return result; } Integer count = tools.getInt(KEY_COUNT); if (null == count){ result = "count is empty"; return result; }else{ if (count > COUNT_MAX || count < COUNT_MIN){ result = "count is unbound[" + COUNT_MIN + "~"+ COUNT_MAX +"]"; return result; } } Integer sleep = tools.getInt(KEY_SLEEP); if( null == sleep){ result = " sleep is empty"; return result; } else{ if (sleep > SLEEP_MAX || sleep < SLEEP_MIN){ result = "sleep is unbound[" + SLEEP_MIN + "~"+ SLEEP_MAX +"]"; return result; } } return result; } //选择造数 public static void chooseFun(String[] args){ //通过加载args参数列表获取参数工具类对象 QParameterTool tools = QParameterTool.fromArgs(args); String topic = tools.get(KEY_TOPIC); String source = tools.get(KEY_SOURCE); Integer count = tools.getInt(KEY_COUNT); Integer sleep = tools.getInt(KEY_SLEEP); if (SOURCE_PRODUCT.equalsIgnoreCase(source)){ //创建关于产品的数据 makeTravelProductData(topic,count,sleep); }else if(SOURCE_PAY.equalsIgnoreCase(source)){ } } //创造产品订单数据 public static void makeTravelProductData(String topic, Integer count, Integer sleep) { //发送序列化对象 String dateFormatter = CommonConstant.FORMATTER_YYYYMMDDHHMMDD; String dayFormatter = CommonConstant.FORMATTER_YYYYMMDD; ChronoUnit chronoUnit = ChronoUnit.MINUTES; ChronoUnit daysChronoUnit = ChronoUnit.DAYS; //辅助数据 //2.1地区信息 List regions = getRegions(); //2.2 酒店信息 Map pubs = getPubMappingPro(); //2.3产品信息 List productIDs = getProducts(); } //旅游产品信息 private static List getProducts() { if (CollectionUtils.isEmpty(products)){ try{ List > productDatas = CSVUtil.readCSVFile(CSVUtil.PRODUCT_FILE,CSVUtil.QUOTE_COMMON); if(CollectionUtils.isNotEmpty(productDatas)){ for (Map product:productDatas){ String productID = product.getOrDefault(PRODUCT_KEY_ID,""); products.add(productID); } } }catch (Exception e){ log.error("TravelOrderHelper 's products errors :" + e); } } return products; } //旅游产品与酒店映射关系 private static Map getPubMappingPro() { //表示没有初始化 if(MapUtils.isEmpty(proMappingPub)){ try{ List > pubDatas = CSVUtil.readCSVFile(CSVUtil.PUB_FILE,CSVUtil.QUOTE_COMMON); //校验 if(CollectionUtils.isNotEmpty(pubDatas)){ //遍历 for (Map pub : pubDatas){ String pubId = pub.getOrDefault(PUB_KEY_ID,""); String[] pps = pubId.split("\\|"); proMappingPub.put(pps[0],pubId); } } }catch (Exception e){ log.error("TravelOrderelper 's pub errors:" + e); } } return proMappingPub; } //获取到地区列表 private static List getRegions() { //如果集合为空,就是第一次获取地区,需要初始化地区信息 if (CollectionUtils.isEmpty(regions)){ try{ //就是一个路径,一个是分隔符 List > regionsDatas = CSVUtil.readCSVFile(CSVUtil.REGION_FILE,CSVUtil.QUOTE_COMMON); //将regionDatas中Map封装到regionDo对象 if(CollectionUtils.isNotEmpty(regionsDatas)){ for (Map region : regionsDatas){ RegionDo regionDo = new RegionDo(); regionDo.setRegionCity(region.getOrDefault(REGION_KEY_CITY, "")); regionDo.setRegionCityDesc(region.getOrDefault(REGION_KEY_CITY_DESC, "")); regionDo.setRegionCode(region.getOrDefault(REGION_KEY_CODE, "")); regionDo.setRegionCodeDesc(region.getOrDefault(REGION_KEY_CODE_DESC, "")); regionDo.setRegionProvince(region.getOrDefault(REGION_KEY_PROVINCE, "")); regionDo.setRegionProvinceDesc(region.getOrDefault(REGION_KEY_PROVINCE_DESC, "")); regions.add(regionDo); } } }catch(Exception e){ log.error("TravelOrderHelper 's region errors : " +e); } } return regions; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)