旅游平台Flink实时项目

旅游平台Flink实时项目,第1张

目录

0 环境准备:

1 软件环境的搭建

1.1 jdk的安装

 1.2 scala的安装

 1.3 hadoop的安装 

 1.4 KAFKA的安装 

  1.5 elastocsearch的安装

1.6 redis的安装

 1.7 flink的安装

1.8 安装mysql

2 数据介绍

2.1 平台功能

2.2 数据的流程描述

2.3 数据组成

2.3.1 事实数据

2.3.1.1 旅游订单数据

 2.3.1.2 用户行为数据

2.3.1.3维度数据

3 造数

3.1 资源准备

3.1.0pom.xml

3.1.1 其他资源

3.2 开发


0 环境准备:

新建虚拟机->设置静态网络->修改映射配置

新建虚拟机 参考

面向大数据开发的集群之虚拟机搭建(一)_林柚晞的博客-CSDN博客

虚拟机网络+设置静态网+修改映射配置+免密登录 参考

项目0单节点的虚拟机做大数据开发(四万字全)_林柚晞的博客-CSDN博客

创建项目路径

mkdir -p /opt/apps 

mkdir -p /opt/software

mkdir -p /opt/scritps

apps 是安装软件路径的路径

software是把安装包上传到虚拟机的文件

scripts是存放一些可以自动化安装软件的脚本

1 软件环境的搭建

先说明一下:就是把安装包扔到指定的位置是使用远程工程工具的。这边省略了把安装包上传到服务器。

自动化安装脚本的总结

-设置安装路径和安装包的位置,配置环境变量的位置

-判断安装路径的前驱路径是否存在

-cat追加文件路径到文件里面(比如修改环境变量,覆盖配置文件)

-正式安装步骤,解压缩等

1.1 jdk的安装

安装包:

链接:https://pan.baidu.com/s/1R5LaDIasYh4vpY_RUB_Qhw?pwd=tff2
提取码:tff2
--来自百度网盘超级会员V2的分享

cd /opt/scripts/

vi install_jdk.sh

#!/bin/bash

# author : XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装jdk
# 约定 > 配置 > 编码

INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
JDK_TAR=/opt/software/jdk-8u45-linux-x64.tar.gz
ETC_PROFILE=/etc/profile

# 提示使用方法的函数
usage() {
    echo "请将jdk-8u45-linux-x64.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}

# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${JDK_TAR} ]; then
    usage    
    exit 0
fi

# 已经安装过了
if [ -e  ${JAVA_DIR} ]; then
    echo "${JAVA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
    exit 0
fi

# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
    mkdir -p ${INSTALL_PREFIX}
    echo "初始化目录:${INSTALL_PREFIX}"
fi

if [ ! -e ${JAVA_DIR} ]; then
    mkdir -p ${JAVA_DIR}
    echo "初始化目录:${JAVA_DIR}"
fi

## 解压JDK的tar包
tar -zxvf ${JDK_TAR} -C ${INSTALL_PREFIX}

## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export JAVA_HOME=${JAVA_DIR}
export CLASS_PATH=.:${JAVA_DIR}/lib/dt.jar:${JAVA_DIR}/lib/tool.jar
export PATH=$PATH:${JAVA_DIR}/bin
EOF

## 提示成功
echo "install jdk successful!!!"

 chmod +x ./install_jdk.sh

./install_jdk.sh

source /etc/profile

java -version

 1.2 scala的安装

安装包:

链接:https://pan.baidu.com/s/1oBbplZTIem1K0g4Wv_PD5Q?pwd=dzcb
提取码:dzcb
--来自百度网盘超级会员V2的分享

vi /opt/scripts/install_scala.sh

 #!/bin/bash

# author :  XXX
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装scala
# 约定 > 配置 > 编码

INSTALL_PREFIX=/opt/apps
SCALA_DIR=${INSTALL_PREFIX}/scala-2.11.8
SCALA_TAR=/opt/software/scala-2.11.8.tgz
ETC_PROFILE=/etc/profile

# 提示使用方法的函数
usage() {
    echo "请将scala-2.11.8.tgz上传到/opt/software处然后再执行此脚本!!!"
}

# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${SCALA_TAR} ]; then
    usage    
    exit 0
fi

# 已经安装过了
if [ -e  ${SCALA_DIR} ]; then
    echo "${SCALA_DIR}路径已经存在,JDK已经安装过了,无需再次安装!!!"
    exit 0
fi

# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
    mkdir -p ${INSTALL_PREFIX}
    echo "初始化目录:${INSTALL_PREFIX}"
fi

if [ ! -e ${SCALA_DIR} ]; then
    mkdir -p ${SCALA_DIR}
    echo "初始化目录:${SCALA_DIR}"
fi

## 解压JDK的tar包
tar -zxvf ${SCALA_TAR} -C ${INSTALL_PREFIX}

## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export SCALA_HOME=${SCALA_DIR}
export PATH=$PATH:${SCALA_DIR}/bin
EOF

## 提示成功
echo "install SCALA successful!!!"

 chmod +x ./install_scala.sh

./install_scala.sh

source /etc/profile

scala -version

 1.3 hadoop的安装 

安装包:

链接:https://pan.baidu.com/s/1O-E4B1MUZjA90rsvEi1nwQ?pwd=q0ey
提取码:q0ey
--来自百度网盘超级会员V2的分享

vi /opt/scripts/install_hadoop.sh

##1. install_hadoop.sh
#!/bin/bash

# author : lixi
# vesion : 1.0
# date : 2022-04-24
# desc : 自动安装hadoop
# 约定 > 配置 > 编码

INSTALL_PREFIX=/opt/apps
JAVA_DIR=${INSTALL_PREFIX}/jdk1.8.0_45
HADOOP_DIR=${INSTALL_PREFIX}/hadoop-2.8.1
HADOOP_TAR=/opt/software/hadoop-2.8.1.tar.gz
ETC_PROFILE=/etc/profile

# 提示使用方法的函数
usage() {
    echo "请将hadoop-2.8.1.tar.gz上传到/opt/software处然后再执行此脚本!!!"
}

# 判断安装包是否已经存放到指定路径了,如果不存在就提示使用方法
if [ ! -e ${HADOOP_TAR} ]; then
    usage    
    exit 0
fi

# 已经安装过了
if [ -e  ${HADOOP_DIR} ]; then
    echo "${HADOOP_DIR}路径已经存在,Hadoop已经安装过了,无需再次安装!!!"
    exit 0
fi

# 如果安装前缀没有,就创建之
if [ ! -e ${INSTALL_PREFIX} ]; then
    mkdir -p ${INSTALL_PREFIX}
    echo "初始化目录:${INSTALL_PREFIX}"
fi

if [ ! -e ${HADOOP_DIR} ]; then
    mkdir -p ${HADOOP_DIR}
    echo "初始化目录:${HADOOP_DIR}"
fi

## 解压JDK的tar包
tar -zxvf ${HADOOP_TAR} -C ${INSTALL_PREFIX}

## 配置Hadoop
## hadoop-env.sh

## core-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/core-site.xml


    
        fs.defaultFS
        hdfs://192.168.10.101:9000
    

    
        hadoop.tmp.dir
        ${HADOOP_DIR}/hdpdata
    

    
        hadoop.proxyuser.root.hosts
        *
    

    
        hadoop.proxyuser.root.groups
        *
    


EOF

## hdfs-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/hdfs-site.xml


    
        fs.replication
        1
    

    
        dfs.http.address
        qianfeng01:50070
    

    
        dfs.secondary.http.address
        qianfeng01:50090
    

    
        dfs.namenode.name.dir
        ${HADOOP_DIR}/hdpdata/dfs/name
    

    
        dfs.datanode.data.dir
        ${HADOOP_DIR}/hdpdata/dfs/data
    

    
        dfs.checkpoint.dir
        ${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
    

    
        dfs.checkpoint.edits.dir
        ${HADOOP_DIR}/hdpdata/dfs/checkpoint/cname
    

    
        dfs.permissions
        false
    


EOF
## yarn-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/yarn-site.xml


    
        yarn.nodemanager.aux-services
        mapreduce_shuffle
    

    
        yarn.resourcemanager.hostname
        qianfeng01
    

    
        yarn.resourcemanager.address
        qianfeng01:8032
    

    
        yarn.resourcemanager.scheduler.address
        qianfeng01:8030
    


EOF
## mapred-site.xml
mv ${HADOOP_DIR}/etc/hadoop/mapred-site.xml.template ${HADOOP_DIR}/etc/hadoop/mapred-site.xml
cat << EOF > ${HADOOP_DIR}/etc/hadoop/mapred-site.xml

   
        mapreduce.framework.name
        yarn
    

    
        mapreduce.jobhistory.address
        qianfeng01:10020
    

    
        mapreduce.jobhistory.webapp.address
        qianfeng01:19888
    


EOF

## slaves
cat << EOF > ${HADOOP_DIR}/etc/hadoop/slaves
qianfeng01
EOF

## 配置环境变量
cat << EOF >> ${ETC_PROFILE}
export HADOOP_HOME=${HADOOP_DIR}
export PATH=$PATH:${HADOOP_DIR}/bin:${HADOOP_DIR}/sbin
EOF

## 格式化
${HADOOP_DIR}/bin/hdfs namenode -format

## 提示成功
echo "install hadoop successful!!!"

chmod +x ./install_hadoop.sh

./install_hadoop.sh

source /etc/profile

hadoop version

cd /opt/apps/hadoop-2.8.1/etc/hadoop/hadoop-env.sh

就是找到JAVA_HOME

JAVA_HOME=/opt/apps/jdk1.8.0_45

start-all.sh

 1.4 KAFKA的安装 

安装包:链接:https://pan.baidu.com/s/1abH6Nn4mD9MR7v5r_dJvBg?pwd=qfz8
提取码:qfz8
--来自百度网盘超级会员V2的分享

 tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/

vi /etc/profile

export KAFKA_HOME=/opt/apps/kafka_2.11-1.1.1

export PATH=$PATH:$KAFKA_HOME/bin

vi /opt/apps/kafka_2.11-1.1.1/config/server.properties

下面是需要修改的地方

broker.id=1

advertised.listeners=PLAINTEXT://192.168.10.101:9092

log.dirs=/opt/apps/kafka_2.11-1.1.1/log/kafka-logs

zookeeper.connect=qianfeng01:2181/kafka

 vi /opt/apps/kafka_2.11-1.1.1/config/producer.properties

 bootstrap.servers=qianfeng01:9092

 vi /opt/apps/kafka_2.11-1.1.1/config/consumer.properties 

 bootstrap.servers=qianfeng01:9092

  vi /opt/apps/kafka_2.11-1.1.1/config/zookeeper.properties

 dataDir=/opt/apps/kafka_2.11-1.1.1/zkData

 [root@hadoop kafka_2.11-1.1.1]# zookeeper-server-start.sh -daemon config/zookeeper.properties

zookeeper-shell.sh qianfeng01:2181

[root@hadoop kafka_2.11-1.1.1]# kafka-server-start.sh -daemon config/server.properties

jps

 kafka-topics.sh --create --zookeeper qianfeng01:2181/kafka --replication-factor 1 --partitions 2 --topic test 

 kafka-topics.sh --list --zookeeper qianfeng01:2181/kafka

  1.5 elastocsearch的安装

安装包:

链接:https://pan.baidu.com/s/1h3C3kneHlLuZxXg8KKzK2g?pwd=y35e
提取码:y35e
--来自百度网盘超级会员V2的分享

 useradd hadoop

 passwd hadoop

使用root账号

 [root@hadoop software]# tar -zxvf elasticsearch-6.5.3.tar.gz -C /opt/apps/

[root@hadoop elasticsearch-6.5.3]# vi /etc/profile
export ELASTICSEARCH_HOME=/opt/apps/elasticsearch-6.5.3
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin

配置

cd /opt/apps/elasticsearch-6.5.3/config/elasticsearch.yml

就是我电脑主机号是qianfeng01(其实我也很想改,但是电脑映射习惯了,改了映射没用了。为了妥协只能一直用了) 涉及到qianfeng01大家就改成自己的主机号哦

cluster.name: qianfeng01

node.name: qianfeng01
node.master: true
node.data: true

path.data: /opt/apps/elasticsearch-6.5.3/data

path.logs: /opt/apps/elasticsearch-6.5.3/logs

network.host: 0.0.0.0

http.port: 9200

discovery.zen.ping.unicast.hosts: ["qianfeng01"]

给普通用户授权

[root@hadoop apps]# ll
total 16
drwxr-xr-x  8 root root 4096 Dec  7  2018 elasticsearch-6.5.3
drwxrwxr-x 11  500  500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x  8   10  143 4096 Apr 11  2015 jdk1.8.0_45
drwxr-xr-x  9 root root 4096 Apr 24 14:32 kafka_2.11-1.1.1
[root@hadoop apps]# chown -R hadoop:hadoop elasticsearch-6.5.3/
[root@hadoop apps]# ls
elasticsearch-6.5.3  hadoop-2.8.1  jdk1.8.0_45  kafka_2.11-1.1.1
[root@hadoop apps]# ll
total 16
drwxr-xr-x  8 hadoop hadoop 4096 Dec  7  2018 elasticsearch-6.5.3
drwxrwxr-x 11    500    500 4096 Apr 24 14:05 hadoop-2.8.1
drwxr-xr-x  8     10    143 4096 Apr 11  2015 jdk1.8.0_45
drwxr-xr-x  9 root   root   4096 Apr 24 14:32 kafka_2.11-1.1.1

##5.2 给与普通用户root借用权限
[root@hadoop apps]# vi /etc/sudoers
## Allow root to run any commands anywhere
root    ALL=(ALL)       ALL
hadoop    ALL=(ALL)       ALL

切换普通用户

su hadoop

elasticsearch

我遇到的问题啊

、max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]_林柚晞的博客-CSDN博客

第二次安装还遇到了一个问题

max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

修改方法

切换root用户

sysctl -w vm.max_map_count=262144

查看

sysctl -a|grep vm.max_map_count

如果显示

vm.max_map_count = 262144

就是成功了

就把账号切换成root,然后reboot一下系统

系统启动之后

su hadoop

elasticsearch -d

查看是否启动成功

jps

curl -XGET qianfeng01:9200

 

就是es报错如何修改

就是切换到普通用户,直接敲elasticsearch

如果是没有报错卡住了就是前台进程,就顺利了

如果要访问web端,就必须把虚拟机中的防火墙关闭

我们再搞一搞插件工具

安装包:

链接:https://pan.baidu.com/s/1-3eWVFmar03xc2RPz-rwRQ?pwd=dklo
提取码:dklo
--来自百度网盘超级会员V2的分享

这个安装包随便解压到一个文件夹下面,只要自己能找到就可以。

打开谷歌浏览器(好久没用过谷歌浏览器了,主要是不能下载插件了,所以浏览器过于简陋,连个常用表单都没有)

 

关掉上面的d窗

打开谷歌浏览器的时候,看见搜索框旁边的小插件图标就可以看见es的快捷访问啦

就是我这边还有个bug没搞好,集群没连上

出现上面那个集群健康值未连接,是因为服务器里面的防火墙开了。

systemctl stop firewalld.service

查看状态

systemctl status firewalld.service

 看一下web端

1.6 redis的安装

安装包:

链接:https://pan.baidu.com/s/1VoBQEeBBovSWQMTAm7T8mw?pwd=teeq
提取码:teeq
--来自百度网盘超级会员V2的分享

[root@hadoop software]# tar -zxvf redis-4.0.11.tar.gz -C /opt/apps/

[root@hadoop redis-4.0.11]# yum -y install gcc-c++

[root@hadoop redis-4.0.11]# make
[root@hadoop redis-4.0.11]# make PREFIX=/opt/apps/redis-4.0.11 install
[root@hadoop redis-4.0.11]# mkdir etc
[root@hadoop redis-4.0.11]# cp redis.conf etc/
[root@hadoop redis-4.0.11]# cd bin/
[root@hadoop bin]# cp redis-benchmark redis-server redis-cli /usr/bin/

 搞配置

 vi /opt/apps/redis-4.0.11/etc/redis.conf

注意都是解除注释,而不是全删啊。

daemonize yes
loglevel verbose

requirepass root

bind 192.168.10.101

就是如果  requirepass 修改会连不到redis服务器,那就注释掉密码试一试。密码我都不想配置了,实在麻烦

[root@hadoop ~]# redis-server /opt/apps/redis-4.0.11/etc/redis.conf
[root@hadoop ~]# ps -ef | grep redis
root     28066     1  0 16:07 ?        00:00:00 redis-server 192.168.10.101:6379
[root@hadoop ~]# redis-cli -h 192.168.10.101
192.168.10.101:6379> SHUTDOWN
(error) NOAUTH Authentication required.
192.168.10.101:6379> AUTH root
OK
192.168.10.101:6379> SHUTDOWN
not connected> exit
[root@hadoop ~]# ps -ef | grep redis
root     28835  1936  0 16:09 pts/0    00:00:00 grep --color=auto redis

redis的开机脚本

[root@hadoop redis-4.0.11]# vi redis

 #!/bin/bash
#chkconfig: 2345 80 90
PATH=/usr/bin:/usr/local/bin:/sbin:/bin
REDIS_PORT=6379
EXEC=/opt/apps/redis-4.0.11/bin/redis-server
REDIS_CLI=/opt/apps/redis-4.0.11/bin/redis-cli

PIDFILE=/var/run/redis.pid
CONF=/opt/apps/redis-4.0.11/etc/redis.conf

case "$1" in
    start)
        if [ -f ${PIDFILE} ];then
            echo "${PIDFILE} exists, process is already running or crashed"
        else
            echo "Starting Redis Server ...."
            ${EXEC} ${CONF}
        fi
        if [ "$?" = "0" ];then
            echo "Redis is running"
        fi
        ;;
    stop)
        if [ ! -e ${PIDFILE} ];then
            echo "${PIDFILE} does not exists, process is not running"
        else
            PID=$(cat ${PIDFILE})
            echo "Stopping..."
            ${REDIS_CLI} -p ${REDIS_PORT} AUTH root
            ${REDIS_CLI} -p ${REDIS_PORT} SHUTDOWN
            while [ -x ${PIDFILE} ]
            do
                echo "Waiting for Redis shutdown..."
                sleep 1
            done
            echo "Redis stoped"
        fi
        ;;
    restart|force-reload)
            ${0} stop
            ${0} start
        ;;
        *)
            echo "Usage: /etc/init.d/redis {start|stop|restart|force-reload}" >&2
            exit 1
esac

 [root@hadoop redis-4.0.11]# cp redis /etc/init.d/

[root@hadoop redis-4.0.11]# chmod +x /etc/init.d/redis

##3. 查看自启动的所有程序

[root@hadoop redis-4.0.11]# chkconfig --list

##4. 将redis添加到自启动列表
[root@hadoop redis-4.0.11]# chkconfig --add redis
[root@hadoop redis-4.0.11]# chkconfig --level 2345 redis on

##5. 启动关闭测试
[root@hadoop redis-4.0.11]# systemctl start redis
[root@hadoop redis-4.0.11]# service redis start

 1.7 flink的安装

安装包:

链接:https://pan.baidu.com/s/1d6tMAwfLuC47TgAsClnY3w?pwd=k1h9
提取码:k1h9
--来自百度网盘超级会员V2的分享

安装standalone

##1. 解压
[root@hadoop software]# tar -zxvf flink-1.9.1-bin-scala_2.11.tgz -C /opt/apps/

##2. 配置环境变量
[root@hadoop flink-1.9.1]# vi /etc/profile


export FLINK_HOME=/opt/apps/flink-1.9.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$KAFKA_HOME/bin:$ELASTICSEARCH_HOME/bin:$FLINK_HOME/bin

[root@hadoop flink-1.9.1]# source /etc/profile

配置

[root@hadoop flink-1.9.1]# vi conf/flink-conf.yaml

jobmanager.rpc.address: qianfeng01

jobmanager.rpc.port: 6123

rest.port: 8081

rest.address: qianfeng01

 ##4. 配置从机文件
[root@hadoop flink-1.9.1]# vi conf/slaves

 qianfeng01

##5. 配置主机文件
[root@hadoop flink-1.9.1]# vi conf/masters 

qianfeng01:8081

 ##6. 先启动hdfs
[root@hadoop flink-1.9.1]# start-dfs.sh
[root@hadoop flink-1.9.1]# start-cluster.sh

web端访问:

http://192.168.10.101:8081/

测试,可以不测试

##8. 测试Flink
##8.1 A窗口
[root@hadoop flink-1.9.1]# yum -y install nc
[root@hadoop flink-1.9.1]# nc -l 8000

111
222
33
444
555
33
111
222

##8.2 B窗口
[root@hadoop flink-1.9.1]# flink run examples/streaming/SocketWindowWordCount.jar --port 8000

[root@hadoop flink-1.9.1]# tail -f log/flink-root-taskexecutor-0-hadoop.out
111 : 1
555 : 1
444 : 1
33 : 1
222 : 1
33 : 1
222 : 1
111 : 1

##9. 提交jar
[root@hadoop flink-1.9.1]# flink run examples/batch/WordCount.jar --input /etc/profile --output /home/output/00
Starting execution of program
Program execution finished
Job with JobID ca414928ddcfd57969326827f5cb7205 has finished.
Job Runtime: 21670 ms


[root@hadoop flink-1.9.1]# tail -f /home/output/00
want 1
we 1
what 1
wide 1
will 1
workaround 1
x 1
you 3
your 1
z 1

1.8 安装mysql

安装包:

链接:https://pan.baidu.com/s/1qi2i9r7loMj3Y04LsMTc6w?pwd=4u1e
提取码:4u1e
--来自百度网盘超级会员V2的分享

##1. 安装引导
[root@hadoop software]# yum -y localinstall mysql-community-release-el6-5.noarch.rpm
##2. 安装mysql
[root@hadoop software]# yum -y install mysql-server
##3. 启动mysql的服务
[root@hadoop software]# service mysqld start
##4. 设置初始root账户密码
[root@hadoop software]# mysqladmin -uroot password '123456'
##5. 登陆mysql服务
[root@hadoop software]# mysql -uroot -p123456
##6. 远程授权
mysql> grant all privileges on *.* to root@"%" identified by "123456" with grant option;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
##7. 就是可以使用mysql的client去远程链接mysql的服务了

2 数据介绍 2.1 平台功能
功能模块说明
核心业务基于核心业务数据(包括旅游订单、酒店住宿、车票业务)的相关实时计算、实时展示
用户行为日志基于产品设计的各种埋点
风控报警用户异常行为进行监控和报警

本项目以大数据实时平台为目标

技术方向说明
实时的ETL实时的清洗、转换为规范的格式化的数据
实时的数据统计实现各种实时的统计指标
实时的数据存储实时数据落地持久化、交互搜索、动态计算提供技术支持
规则处理主要是服务实时风控、报警相关需求
交互式查询交互式查询明细数据或实时的聚合数据(Clickhouse、Apache Druid)
实时数据展示主要是服务于数据使用方,提供更直观的数据展示形式
2.2 数据的流程描述

实时的旅游平台,说明数据一定会经过消息中间件(消息队列/消息通道)。原始的数据源通常都是通过消息通道来完成数据的采集。随后根据实际情况将消息通道中的数据存储在各种不同的分布式存储介质中。最终统计存储在分布式介质中的数据,也可以做成BI类型可视化展示。

2.3 数据组成 2.3.1 事实数据 2.3.1.1 旅游订单数据

{
    "userID":"1986",
    "user_mobile":"15549477595",
    "product_id":"12312312",
    "product_traffic":"01",
    "product_traffic_grade":"11",
    "product_traffic_type" : "1",
    "product_pub" : "sdfsfsdfsdff | sdfsdfsdfds",
    "user_region" : "12312",
    "travel_memeber_adult" : "2",
    ...
}

 userID = 用户唯一编号
user_mobile = 用户手机号码
product_id = 旅游这个产品编号
product_traffic = 旅游的交通选择
product_traffic_grade = 坐席
product_traffic_type = 行程种类
product_pub = 旅游住宿选择
user_region = 所在区域
travel_memeber_adult = 本次旅游的成年人的人数
travel_memeber_younger = 本次旅游的儿童的人数
travel_member_baby = 本次旅游的婴儿的人数
product_price = 产品原价格
has_activity = 0表示五活动价格 | 0.8表示打八折
product_fee = 产品价格
order_id = 旅游订单
order_ct = 下单时间

 2.3.1.2 用户行为数据

行为类型:action: 'launch(启动) | interactive(交互)| page_enter(页面曝光:其实就是产品页展示)'
事件类型:eventType:  'view(浏览) | slide(滑动)| click:评论、点赞、分享(点击)  '
用户ID:userID(sdfsfasdf1231)
app端的手机设备号:userDevice
*** 作系统:os
手机制造商:manufacturer
运营商:carrier
网络类型:networkType
所在区域:userRegion
所在区域IP : userRegionIP
经度:longitude
纬度:latitude
扩展信息:exts
事件发生时间:ct

 {
    "os":"1",
    "longitude":"115.12321",
    "latitude":"26.88282",
    "userRegion":"12312",
    "userID":"12312",
    "manufacturer":"09",
    "userRegionIP":"10.206.0.4",
    "action":"08",
    "eventType":"01"
    ...
}

2.3.1.3维度数据

旅游产品维度表

CREATE TABLE travel.dim_product(
    product_id text NULL COMMENT '旅游产品的编号',
    product_level int(11) DEFAULT 0 COMMENT '旅游产品的级别',
    product_type text  NULL COMMENT '旅游产品的类型',
    departure_code text  NULL COMMENT '旅游产品的出发地编码',
    des_city_code text  NULL COMMENT '旅游产品的目的地编码',
    tourim_ticket_type text  NULL COMMENT '旅游产品的订单类型'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

酒店维度表

CREATE TABLE travel.dim_pub(
    pub_id text NULL COMMENT '酒店的编号',
    pub_name text NULL COMMENT '酒店的名称',
    pub_stat text  NULL COMMENT '酒店的星级',
    pub_grade text  NULL COMMENT '酒店的等级编码',
    pub_grade_desc text  NULL COMMENT '酒店的等级编码描述',
    pub_area_code text  NULL COMMENT '酒店所在区域编码',
    pub_address text  NULL COMMENT '酒店地址',
    is_national text  NULL COMMENT '是否属于境内',
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

导入维度数据到mysql

sql文件(因为文件比较大,然后需要自己下载到本地,使用navicat远程连接服务器上的mysql)

链接:https://pan.baidu.com/s/1dfCxtrmgdCNnFeCfM_cDdQ?pwd=btl3
提取码:btl3
--来自百度网盘超级会员V2的分享

打开一下服务器

打开windows上面的navicat

 

[root@qianfeng01 ~]# mysql -uroot -p123456

 

下面就开始api开发吧

3 造数

打开idea,创建一个maven项目

3.1 资源准备 3.1.0pom.xml


    4.0.0

    com.qf.bigdata
    qfdata
    1.0

    
        UTF-8
        UTF-8
        1.8

        1.1.8
        1.7.22
        1.2.16

        1.2.29
        2.4.0
        2.8.5
        2.9.3
        21.0
        5.1.44
        3.5.1
        3.4
        2.4
        1.6

        1.8.1
        1.8.1

        
        2.8.1
        0.11
        1.1.1
        3.0.3

        compile
        
    


    

        
        
            org.slf4j
            slf4j-api
            ${slf4j.version}
            ${scope}
        

        
            log4j
            log4j
            ${log4j.version}
            ${scope}
        

        
            org.slf4j
            jcl-over-slf4j
            ${slf4j.version}
            ${scope}
        

        
            ch.qos.logback
            logback-classic
            ${logback.version}
            ${scope}
        

        
            ch.qos.logback
            logback-core
            ${logback.version}
            ${scope}
        

        
            ch.qos.logback
            logback-access
            ${logback.version}
            ${scope}
        


        
        
            com.google.guava
            guava
            ${guava.version}
            ${scope}
        

        
        
            com.alibaba
            fastjson
            ${fastjson.version}
            ${scope}
        


        
            com.google.code.gson
            gson
            ${gson.version}
        


        
            com.jayway.jsonpath
            json-path
            ${json-path.version}
            ${scope}
        

        
        
            mysql
            mysql-connector-java
            ${mysql.jdbc.version}
            ${scope}
        


        
        
            com.google.protobuf
            protobuf-java
            ${protobuf.version}
            ${scope}
        


        
        
            org.apache.commons
            commons-csv
            ${commons-csv.version}
            ${scope}
        

        
            org.apache.commons
            commons-lang3
            ${commons-lang3.version}
            ${scope}
        

        
            commons-io
            commons-io
            ${commons-io.version}
            ${scope}
        

        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            ${scope}
            
                
                    protobuf-java
                    com.google.protobuf
                
            
        

        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${scope}
            
                
                    jsp-api
                    javax.servlet.jsp:
                
                
                    org.slf4j
                    log4j-over-slf4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${scope}
        

        
        
            com.101tec
            zkclient
            ${zk.client.version}
            ${scope}
        

        
        
            org.apache.kafka
            kafka_2.11
            ${kafka.version}
            ${scope}
        

        
            org.apache.kafka
            kafka-clients
            ${kafka.version}
            ${scope}
        

        
        
            com.esotericsoftware
            kryo
            ${kryo.version}
            ${scope}
        

        

        


        
        
            org.apache.avro
            avro
            ${avro.version}
        

        
        
            org.apache.parquet
            parquet-avro
            ${parquet.version}
            ${scope}
        

        
            org.apache.parquet
            parquet-common
            ${parquet.version}
            ${scope}
        

        
            org.apache.parquet
            parquet-tools
            ${parquet.version}
            ${scope}
        

        
            org.apache.parquet
            parquet-hadoop-bundle
            ${parquet.version}
            ${scope}
        




    


    
        
            dev
            
                true
            
            
                
                    
                        ${project.basedir}/src/main/resources
                        true
                    
                
                
                    
                        org.apache.maven.plugins
                        maven-shade-plugin
                    
                
            
        
    


    
        target/classes
        target/test-classes

        
            
                
                    org.apache.maven.plugins
                    maven-shade-plugin
                    3.1.0
                    
                        
                        
                            TravelCurLogJob
                            package
                            
                                shade
                            
                            
                                true
                                TravelCurLogJob
                                
                                    
                                        *:*
                                        
                                            META-INF/*.SF
                                            META-INF/*.DSA
                                            META-INF/*.RSA
                                        
                                    
                                
                                
                                    
                                        reference.conf
                                    
                                    
                                        com.qf.bigdata.realtime.util.data.travel.logs.TravelOrderHelper
                                    
                                
                            
                        


                        
                        
                            TravelOrderJob
                            package
                            
                                shade
                            
                            
                                true
                                TravelOrderJob
                                
                                    
                                        *:*
                                        
                                            META-INF/*.SF
                                            META-INF/*.DSA
                                            META-INF/*.RSA
                                        
                                    
                                
                                
                                    
                                        reference.conf
                                    
                                    
                                        com.qf.bigdata.realtime.util.TravelOrderHelper
                                    
                                
                            
                        

                    
                
            
        

        

            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        scala-compile-first
                        process-resources
                        
                            add-source
                            compile
                        
                    
                    
                        scala-test-compile
                        process-test-resources
                        
                            testCompile
                        
                    
                
            
            
                org.codehaus.mojo
                build-helper-maven-plugin
                3.0.0
                
                    
                        add-source
                        generate-sources
                        
                            add-source
                        
                        
                            
                                src/main/scala
                            
                        
                    
                    
                        add-test-source
                        generate-sources
                        
                            add-test-source
                        
                        
                            
                                src/test/scala
                            
                        
                    
                
            

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.6.1
                
                    ${java.version}
                    ${java.version}
                    ${file.encoding}
                
            

        

    
3.1.1 其他资源

就是把资源上传到项目的resource文件夹里面,里面文件中的ip和端口要根据自己虚拟机端口进行修改

链接:https://pan.baidu.com/s/1sIlLszb-OgeYErCIYWMtnw?pwd=36wl
提取码:36wl
--来自百度网盘超级会员V2的分享

3.2 开发

这是我的暂时的项目结构

下面我是做的功能点

在TravelOrderHelper里面对于酒店表、地区表、用户表进行创建字段的静态常量。

CSVUtil类负责读取resource中存在的csv文件的数据,把每条数据封装到Map,最后再封装到一个List中。

对于三张不同的表的 *** 作

地区表:主要是创建了一个RegionDo的javabean,然后把地区的数据进行读取遍历封装到map中,最后又封装到一个List。其他的表同理。

package com.qf.bigdata.realtime.constant;

import java.io.Serializable;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;

public class CommonConstant implements Serializable {

    public static final int DEF_NUMBER_ZERO = 0;
    public static final int DEF_NUMBER_ONE = 1;
    public static final int DEF_NUMBER_DUL = 2;

    //用户数量限制级别
    public static final Integer USER_COUNT_LEVEL = 5;

    public static final int DEF_CODE_COUNT = 4; //代码位数
    public static final int DEF_RANGER = 10; //范围

    //时间格式
    public static final DateTimeFormatter PATTERN_YYYYMMDD =  DateTimeFormatter.ofPattern("yyyyMMdd");

    public static final DateTimeFormatter PATTERN_YYYYMMDD_MID =  DateTimeFormatter.ofPattern("yyyy-MM-dd");

    public static final DateTimeFormatter PATTERN_HOUR =  DateTimeFormatter.ofPattern("HH");

    public static final String FORMATTER_YYYYMMDD = "yyyyMMdd";
    public static final String FORMATTER_YYYYMMDD_MID = "yyyy-MM-dd";
    public static final String FORMATTER_YYYYMMDDHHMMDD = "yyyyMMddHHmmss";
    public static final String FORMATTER_YYYYMMDDHHMM = "yyyyMMddHHmm";
    public static final String FORMATTER_YYYYMMDDHH = "yyyyMMddHH";

    //发送序列化对象
    public static final ChronoUnit chronoUnit = ChronoUnit.MINUTES;
    public static final ChronoUnit dayChronoUnit = ChronoUnit.DAYS;

    //===charset====================================================================
    public static final String CHARSET_UTF8 = "utf-8"; //测试通道

    //===kafka-topic====================================================================
    public static final String TOPIC_TEST = "t-release"; //投放topic

    public static final String KAFKA_PRODUCER_JSON_PATH = "kafka/json/kafka-producer.properties";
    public static final String KAFKA_CONSUMER_JSON_PATH = "kafka/json/kafka-consumer.properties";
    public static final String KAFKA_CONFIG_PATH = "kafka/kafka-config.properties";


    //===zk====================================================================
    public static final String ZK_CONNECT = "zk.connect";
    public static final String ZK_CONNECT_KAFKA = "zk.kafka.connect";
    public static final String ZK_SESSION_TIMEOUT = "zk.session.timeout";
    public static final String ZK_CONN_TIMEOUT = "zk.connection.timeout";
    public static final String ZK_BEE_ROOT = "zk.dw.root";


    //===常用符号====================================================================

    public static final String Encoding_UTF8 = "UTF-8";
    public static final String Encoding_GBK = "GBK";

    public static final String MIDDLE_LINE = "-";
    public static final String BOTTOM_LINE = "_";
    public static final String COMMA = ",";
    public static final String SEMICOLON = ";";
    public static final String PLINE = "|";
    public static final String COLON = ":";
    public static final String PATH_W = "\\";
    public static final String PATH_L = "/";
    public static final String POINT = ".";
    public static final String BLANK = " ";

    public static final String LEFT_ARROWS = "<-";
    public static final String RIGHT_ARROWS = "->";

    public static final String LEFT_BRACKET = "[";
    public static final String RIGHT_BRACKET = "]";

    public static final String TAB = "\t";

    //=====================================================
    public static final String KAFKA_DATA_KEY_TOPIC = "topic";
    public static final String KEY_TIMESTAMP = "timestamp";
    public static final String KEY_KEY = "key";
    public static final String KEY_VALUE = "value";
    public static final String KEY_OFFSET = "offset";
    public static final String KEY_PARTITION = "partition";

    public static final String KEY_CTTIME_BEGIN = "ctTimeBegin";
    public static final String KEY_CTTIME_END = "ctTimeEnd";
    public static final String KEY_CTTIME = "ctTime";
    public static final String KEY_USER_CODE = "userCode";

    public static final String KEY_ORDER_CODE = "orderCode";
    public static final String KEY_VEHICLE_CODE = "vehicleCode";
    public static final String KEY_VEHICLE_TYPE = "vehicleType";
    public static final String  KEY_STATUS = "status";
    public static final String  KEY_LNG = "longitude";
    public static final String  KEY_LAT = "latitude";
    public static final String  KEY_GEOHASH = "geoHash";
    public static final String  KEY_ADCODE = "adcode";
    public static final String  KEY_PROVINCE = "province";
    public static final String  KEY_DISTRICT = "district";
    public static final String  KEY_TOWNCODE = "towncode";
    public static final String  KEY_TOWNSHIP = "township";
    public static final String  KEY_FORMATTED_ADDRESS = "formatted_address";
    public static final String  KEY_ADDRESS = "address";
    public static final String  KEY_G_SIGNAL = "gSignal";

    //GIS
    public final static Double LONGITUDE_CHINA_MAX = 135.05; //经度
    public final static Double LONGITUDE_CHINA_MIN = 73.66; //经度

    public final static Double LATITUDE_CHINA_MAX = 53.55; //纬度
    public final static Double LATITUDE_CHINA_MIN = 3.86; //纬度

    //多省
    public final static Double LONGITUDE_REGION_MAX = 125.30; //经度
    public final static Double LONGITUDE_REGION_MIN = 113.25; //经度

    public final static Double LATITUDE_REGION_MAX = 34.365791; //纬度
    public final static Double LATITUDE_REGION_MIN = 30.038888; //纬度


    //北京行政中心的纬度为39.92,经度为116.46
    //北京 北纬39”26'至41”03',东经115”25'至117”30'
    public final static Double LONGITUDE_BJ_MAX = 117.30; //经度
    public final static Double LONGITUDE_BJ_MIN = 115.25; //经度

    public final static Double LATITUDE_BJ_MAX = 41.03; //纬度
    public final static Double LATITUDE_BJ_MIN = 39.26; //纬度

    //学校附近
    public final static Double LONGITUDE_QF_MIN = 116.300000; //经度
    public final static Double LONGITUDE_QF_MAX = 116.399999; //经度

    public final static Double LATITUDE_QF_MIN = 40.060000; //纬度
    public final static Double LATITUDE_QF_MAX = 40.069999; //纬度
}

package com.qf.bigdata.realtime.dvo;
//国家地区
public class RegionDo {
    public String regionCode ;
    public String regionCodeDesc ;
    public String regionCity;
    public String reginCityDesc;
    public String regionProvince;
    public String regionProvinceDesc;

    public String getRegionCode() {
        return regionCode;
    }

    public String getRegionCodeDesc() {
        return regionCodeDesc;
    }

    public String getRegionCity() {
        return regionCity;
    }

    public String getReginCityDesc() {
        return reginCityDesc;
    }

    public String getRegionProvince() {
        return regionProvince;
    }

    public String getRegionProvinceDesc() {
        return regionProvinceDesc;
    }

    public void setRegionCode(String regionCode) {
        this.regionCode = regionCode;
    }

    public void setRegionCodeDesc(String regionCodeDesc) {
        this.regionCodeDesc = regionCodeDesc;
    }

    public void setRegionCity(String regionCity) {
        this.regionCity = regionCity;
    }

    public void setReginCityDesc(String reginCityDesc) {
        this.reginCityDesc = reginCityDesc;
    }

    public void setRegionProvince(String regionProvince) {
        this.regionProvince = regionProvince;
    }

    public void setRegionProvinceDesc(String regionProvinceDesc) {
        this.regionProvinceDesc = regionProvinceDesc;
    }

    public void setRegionCityDesc(String orDefault) {
    }
}
package com.qf.bigdata.realtime.util;

import com.qf.bigdata.realtime.dvo.RegionDo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
// *** 作csv文件的工具类
public class CSVUtil implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(CSVUtil.class);
    public static final String AREA_CODE_CSV_FILE = "areacode/areacode.csv";
    public static final String AREA_CODE_GIS_LOCATION_CHINA_CSV_FILE = "areacode/china_gis_location.csv";
    public static final String REGION_FILE = "areacode/dim_region.csv";
    public static final String PUB_FILE = "areacode/dim_pub.csv";
    public static final String PRODUCT_FILE = "areacode/dim_product.csv";

    public static final char QUOTE_TAB = '\t';
    public static final char QUOTE_COMMON = ',';
    public static final char PLINE = '|';
    public static final char PATH_EQ = '=';
    public static final String NEW_LINE_SEPARATOR = "\n";


    //读取csv文件
    public static List> readCSVFile(String path,char delimiter){
        //创建返回结果
        List> result = new ArrayList<>();
        Reader reader = null;
        try{
            reader = new InputStreamReader(CSVUtil.class.getClassLoader().getResourceAsStream(path));
            //通过流获取csv的解析器
            CSVParser csvParser = new CSVParser(
                    reader,
                    CSVFormat.DEFAULT
                            .withFirstRecordAsHeader() //第一行数据是作为表头,而不是数据
                            .withDelimiter(delimiter) //设置分隔符
                            .withIgnoreHeaderCase() //忽略header
                            .withTrim() //去掉前面和后面的空格
            );
            //获取所有的列名
            Map header = csvParser.getHeaderMap();
            Set colKeys = header.keySet();
            //遍历csv的解析器,获取其他的每一个csv的记录
            for(CSVRecord csdvRecord:csvParser){
                //将reacord对象转换为map对象,得到每行的值
                Map values = csdvRecord.toMap();
                //遍历列名的列表
                for(String colKey: colKeys){
                    String colValue = csdvRecord.get(colKey);
                    values.put(colKey,colValue);
                }
                //把value的map添加到最终结果集合中
                result.add(values);
            }
        }catch (Exception e){
           log.error("read.csvfile.error",path);
        }finally {
            if(null != reader){
                try{
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        return result;
    }

//    public static void main(String[] args) {
//        List> list = readCSVFile(REGION_FILE,',');
//        System.out.println(list);
//    }



}

特别说明上面最后几行注释就是测试是否可以读取csv文件,如图运行。

 

package com.qf.bigdata.realtime.util;

import org.apache.commons.lang3.math.NumberUtils;

import java.io.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class QParameterTool implements Serializable, Cloneable{

    private static final long serialVersionUID = 1L;

    protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
    protected static final String DEFAULT_UNDEFINED = "";

    // ------------------ Constructors ------------------------

    /**
     * Returns {@link QParameterTool} for the given arguments. The arguments are keys followed by values.
     * Keys have to start with '-' or '--'
     *
     * 

Example arguments: * --key1 value1 --key2 value2 -key3 value3 * * @param args Input array arguments * @return A {@link QParameterTool} */ public static QParameterTool fromArgs(String[] args) { final Map map = new HashMap<>(args.length / 2); int i = 0; while (i < args.length) { final String key; if (args[i].startsWith("--")) { key = args[i].substring(2); } else if (args[i].startsWith("-")) { key = args[i].substring(1); } else { throw new IllegalArgumentException( String.format("Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.", Arrays.toString(args), args[i])); } if (key.isEmpty()) { throw new IllegalArgumentException( "The input " + Arrays.toString(args) + " contains an empty argument"); } i += 1; // try to find the value if (i >= args.length) { map.put(key, NO_VALUE_KEY); } else if (NumberUtils.isNumber(args[i])) { map.put(key, args[i]); i += 1; } else if (args[i].startsWith("--") || args[i].startsWith("-")) { // the argument cannot be a negative number because we checked earlier // -> the next argument is a parameter name map.put(key, NO_VALUE_KEY); } else { map.put(key, args[i]); i += 1; } } return fromMap(map); } /** * Returns {@link QParameterTool} for the given {@link Properties} file. * * @param path Path to the properties file * @return A {@link QParameterTool} * @throws IOException If the file does not exist * @see Properties */ public static QParameterTool fromPropertiesFile(String path) throws IOException { File propertiesFile = new File(path); return fromPropertiesFile(propertiesFile); } /** * Returns {@link QParameterTool} for the given {@link Properties} file. * * @param file File object to the properties file * @return A {@link QParameterTool} * @throws IOException If the file does not exist * @see Properties */ public static QParameterTool fromPropertiesFile(File file) throws IOException { if (!file.exists()) { throw new FileNotFoundException("Properties file " + file.getAbsolutePath() + " does not exist"); } try (FileInputStream fis = new FileInputStream(file)) { return fromPropertiesFile(fis); } } /** * Returns {@link QParameterTool} for the given InputStream from {@link Properties} file. * * @param inputStream InputStream from the properties file * @return A {@link QParameterTool} * @throws IOException If the file does not exist * @see Properties */ public static QParameterTool fromPropertiesFile(InputStream inputStream) throws IOException { Properties props = new Properties(); props.load(inputStream); return fromMap((Map) props); } /** * Returns {@link QParameterTool} for the given map. * * @param map A map of arguments. Both Key and Value have to be Strings * @return A {@link QParameterTool} */ public static QParameterTool fromMap(Map map) { return new QParameterTool(map); } /** * Returns {@link QParameterTool} from the system properties. * Example on how to pass system properties: * -Dkey1=value1 -Dkey2=value2 * * @return A {@link QParameterTool} */ public static QParameterTool fromSystemProperties() { return fromMap((Map) System.getProperties()); } // ------------------ QParameterTool ------------------------ public Map data = null; // data which is only used on the client and does not need to be transmitted protected transient Map defaultData; protected transient Set unrequestedParameters; private QParameterTool(Map data) { this.data = Collections.unmodifiableMap(new HashMap<>(data)); this.defaultData = new ConcurrentHashMap<>(data.size()); this.unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); unrequestedParameters.addAll(data.keySet()); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } QParameterTool that = (QParameterTool) o; return Objects.equals(data, that.data) && Objects.equals(defaultData, that.defaultData) && Objects.equals(unrequestedParameters, that.unrequestedParameters); } @Override public int hashCode() { return Objects.hash(data, defaultData, unrequestedParameters); } /** * Returns the set of parameter names which have not been requested with * {@link #has(String)} or one of the {@code get} methods. Access to the * map returned by {@link #toMap()} is not tracked. */ public Set getUnrequestedParameters() { return Collections.unmodifiableSet(unrequestedParameters); } // ------------------ Get data from the util ---------------- /** * Returns number of parameters in {@link QParameterTool}. */ public int getNumberOfParameters() { return data.size(); } /** * Returns the String value for the given key. * If the key does not exist it will return null. */ public String get(String key) { addToDefaults(key, null); unrequestedParameters.remove(key); return data.get(key); } /** * Returns the String value for the given key. * If the key does not exist it will throw a {@link RuntimeException}. */ public String getRequired(String key) { addToDefaults(key, null); String value = get(key); if (value == null) { throw new RuntimeException("No data for required key '" + key + "'"); } return value; } /** * Returns the String value for the given key. * If the key does not exist it will return the given default value. */ public String get(String key, String defaultValue) { addToDefaults(key, defaultValue); String value = get(key); if (value == null) { return defaultValue; } else { return value; } } /** * Check if value is set. */ public boolean has(String value) { addToDefaults(value, null); unrequestedParameters.remove(value); return data.containsKey(value); } // -------------- Integer /** * Returns the Integer value for the given key. * The method fails if the key does not exist or the value is not an Integer. */ public int getInt(String key) { addToDefaults(key, null); String value = getRequired(key); return Integer.parseInt(value); } /** * Returns the Integer value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not an Integer. */ public int getInt(String key, int defaultValue) { addToDefaults(key, Integer.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } return Integer.parseInt(value); } // -------------- LONG /** * Returns the Long value for the given key. * The method fails if the key does not exist. */ public long getLong(String key) { addToDefaults(key, null); String value = getRequired(key); return Long.parseLong(value); } /** * Returns the Long value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Long. */ public long getLong(String key, long defaultValue) { addToDefaults(key, Long.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } return Long.parseLong(value); } // -------------- FLOAT /** * Returns the Float value for the given key. * The method fails if the key does not exist. */ public float getFloat(String key) { addToDefaults(key, null); String value = getRequired(key); return Float.valueOf(value); } /** * Returns the Float value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Float. */ public float getFloat(String key, float defaultValue) { addToDefaults(key, Float.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Float.valueOf(value); } } // -------------- DOUBLE /** * Returns the Double value for the given key. * The method fails if the key does not exist. */ public double getDouble(String key) { addToDefaults(key, null); String value = getRequired(key); return Double.valueOf(value); } /** * Returns the Double value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Double. */ public double getDouble(String key, double defaultValue) { addToDefaults(key, Double.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Double.valueOf(value); } } // -------------- BOOLEAN /** * Returns the Boolean value for the given key. * The method fails if the key does not exist. */ public boolean getBoolean(String key) { addToDefaults(key, null); String value = getRequired(key); return Boolean.valueOf(value); } /** * Returns the Boolean value for the given key. If the key does not exists it will return the default value given. * The method returns whether the string of the value is "true" ignoring cases. */ public boolean getBoolean(String key, boolean defaultValue) { addToDefaults(key, Boolean.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Boolean.valueOf(value); } } // -------------- SHORT /** * Returns the Short value for the given key. * The method fails if the key does not exist. */ public short getShort(String key) { addToDefaults(key, null); String value = getRequired(key); return Short.valueOf(value); } /** * Returns the Short value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Short. */ public short getShort(String key, short defaultValue) { addToDefaults(key, Short.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Short.valueOf(value); } } // -------------- BYTE /** * Returns the Byte value for the given key. * The method fails if the key does not exist. */ public byte getByte(String key) { addToDefaults(key, null); String value = getRequired(key); return Byte.valueOf(value); } /** * Returns the Byte value for the given key. If the key does not exists it will return the default value given. * The method fails if the value is not a Byte. */ public byte getByte(String key, byte defaultValue) { addToDefaults(key, Byte.toString(defaultValue)); String value = get(key); if (value == null) { return defaultValue; } else { return Byte.valueOf(value); } } // --------------- Internals protected void addToDefaults(String key, String value) { final String currentValue = defaultData.get(key); if (currentValue == null) { if (value == null) { value = DEFAULT_UNDEFINED; } defaultData.put(key, value); } else { // there is already an entry for this key. Check if the value is the undefined if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) { // update key with better default value defaultData.put(key, value); } } } // ------------------------- Export to different targets ------------------------- /** * @return A {@link Properties} */ public Properties getProperties() { Properties props = new Properties(); props.putAll(this.data); return props; } /** * Create a properties file with all the known parameters (call after the last get*() call). * Set the default value, if available. * *

Use this method to create a properties file skeleton. * * @param pathToFile Location of the default properties file. */ public void createPropertiesFile(String pathToFile) throws IOException { createPropertiesFile(pathToFile, true); } /** * Create a properties file with all the known parameters (call after the last get*() call). * Set the default value, if overwrite is true. * * @param pathToFile Location of the default properties file. * @param overwrite Boolean flag indicating whether or not to overwrite the file * @throws IOException If overwrite is not allowed and the file exists */ public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException { final File file = new File(pathToFile); if (file.exists()) { if (overwrite) { file.delete(); } else { throw new RuntimeException("File " + pathToFile + " exists and overwriting is not allowed"); } } final Properties defaultProps = new Properties(); defaultProps.putAll(this.defaultData); try (final OutputStream out = new FileOutputStream(file)) { defaultProps.store(out, "Default file created by Flink's ParameterUtil.createPropertiesFile()"); } } protected Object clone() throws CloneNotSupportedException { return new QParameterTool(this.data); } // ------------------------- Interaction with other ParameterUtils ------------------------- /** * Merges two {@link QParameterTool}. * * @param other Other {@link QParameterTool} object * @return The Merged {@link QParameterTool} */ public QParameterTool mergeWith(QParameterTool other) { final Map resultData = new HashMap<>(data.size() + other.data.size()); resultData.putAll(data); resultData.putAll(other.data); final QParameterTool ret = new QParameterTool(resultData); final HashSet requestedParametersLeft = new HashSet<>(data.keySet()); requestedParametersLeft.removeAll(unrequestedParameters); final HashSet requestedParametersRight = new HashSet<>(other.data.keySet()); requestedParametersRight.removeAll(other.unrequestedParameters); ret.unrequestedParameters.removeAll(requestedParametersLeft); ret.unrequestedParameters.removeAll(requestedParametersRight); return ret; } // ------------------------- ExecutionConfig.UserConfig interface ------------------------- public Map toMap() { return data; } // ------------------------- Serialization --------------------------------------------- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); defaultData = new ConcurrentHashMap<>(data.size()); unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size())); } }

package com.qf.bigdata.realtime.util;

import com.qf.bigdata.realtime.constant.CommonConstant;
import com.qf.bigdata.realtime.dvo.RegionDo;
import com.qf.bigdata.realtime.util.CSVUtil;
import com.qf.bigdata.realtime.util.QParameterTool;
import com.sun.org.apache.regexp.internal.RE;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.LdapGroupsMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Int;

import java.io.Serializable;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TravelOrderHelper implements Serializable {
 //日志管理工具
    private static final Logger log = LoggerFactory.getLogger(TravelOrderHelper.class);
    /*
    *订单数据属性
    * 用户唯一编号
    */
    public static final String KEY_KAFKA_ID = "KAFKA_ID";
    public static final String KEY_ORDER_ID = "order_id";
    public static final String KEY_USER_ID = "userid";
    public static final String KEY_USER_MOBILE = "user_mobile";
    public static final String KEY_PRODUCT_ID = "product_id";
    public static final String KEY_PRODUCT_TRAFFIC = "product_traffic";
    public static final String KEY_PRODUCT_TRAFFIC_GRADE = "product_traffic_grade";
    public static final String KEY_PRODUCT_TRAFFIC_TYPE = "product_traffic_type";
    public static final String KEY_PRODUCT_PUB = "product_pub";
    public static final String KEY_USER_REGION = "user_region";
    public static final String KEY_TRAVEL_MEMBER_ADULT = "travel_member_adult";
    public static final String KEY_TRAVEL_MEMBER_YOUNGER = "travel_member_younger";
    public static final String KEY_TRAVEL_MEMBER_BABY = "travel_member_baby";
    public static final String KEY_PRODUCT_PRICE = "product_price";
    public static final String KEY_HAS_ACTIVITY = "has_activity";
    public static final String KEY_PRODUCT_FEE = "product_fee";
    public static final String KEY_ORDER_CT = "order_ct";

    // 地区
    /*
     * REGION_KEY_CODE : 区域码
     * REGION_KEY_CODE_DESC = 区域码说明
     * REGION_KEY_CITY : 区域城市
     * REGION_KEY_CITY_DESC : 区域城市描述
     * REGION_KEY_PROVINCE : 区域省份
     * REGION_KEY_PROVINCE_DESC : 区域省份描述
     */
    public static final String REGION_KEY_CODE = "region_code";
    public static final String REGION_KEY_CODE_DESC = "region_code_desc";
    public static final String REGION_KEY_CITY = "region_city";
    public static final String REGION_KEY_CITY_DESC = "region_city_desc";
    public static final String REGION_KEY_PROVINCE = "region_province";
    public static final String REGION_KEY_PROVINCE_DESC = "region_province_desc";

    // 地区列表,里面存放了所有的地区
    public static final List regions = new ArrayList<>();

    // 酒店
    /*
     * PUB_KEY_ID : '酒店的编号',
     * PUB_KEY_NAME : '酒店的名称',
     * PUB_KEY_STAT : '酒店的星级',
     * PUB_KEY_GRADE : '酒店的等级编码',
     * PUB_KEY_GRADE_DESC : '酒店的等级编码描述',
     * PUB_KEY_AREA_CODE : '酒店所在区域编码',
     * PUB_KEY_ADDRESS : '酒店地址',
     * PUB_KEY_IS_NATIONAL : '是否属于境内',
     */
    public static final String PUB_KEY_ID = "pub_id";
    public static final String PUB_KEY_NAME = "pub_name";
    public static final String PUB_KEY_STAT = "pub_stat";
    public static final String PUB_KEY_GRADE = "pub_grade";
    public static final String PUB_KEY_GRADE_DESC = "pub_grade_desc";
    public static final String PUB_KEY_AREA_CODE = "pub_area_code";
    public static final String PUB_KEY_ADDRESS = "pub_address";
    public static final String PUB_KEY_IS_NATIONAL = "is_national";

    // 酒店与旅游产品的映射集合
    public static final Map proMappingPub = new HashMap<>();

    //产品
    public static final String PRODUCT_KEY_ID = "product_id";
    public static final String SHOP_KEY_ID = "shop_id";

    // 产品的信息列表
    public static final List products = new ArrayList<>();

    //参数
    public static final String KEY_TOPIC = "topic";
    public static final String KEY_SOURCE = "source";
    public static final String KEY_BEGIN = "begin";
    public static final String KEY_END = "end";
    public static final String KEY_COUNT = "count";
    public static final String KEY_SLEEP = "sleep";

    public static final String SOURCE_PRODUCT = "product";
    public static final String SOURCE_PAY = "pay";

    public static final String TIME_ORDER = "time_order";

    public static final Integer TIME_RANGE_MIN = 1;
    public static final Integer TIME_RANGE_MAX = 120;

    public static final Integer COUNT_MIN = 1;
    public static final Integer COUNT_MAX = 10000;

    public static final Integer SLEEP_MIN = 1;
    public static final Integer SLEEP_MAX = 3600 * 1000;


    /*
     * 造数据(订单数据)的入口
     * --topic t_travel_orders
     * */
     public static void main(String[] args) {
     //校验这个参数是否合法
         String checkResult = checkParams(args);
         //校验结果
         if (StringUtils.isEmpty(checkResult)){
             //2.1执行if说明参数无问题,效果是连续不断地创作数据,然后将数据生产到指定的kafka主题
             chooseFun(args);
         }else {
             System.err.println("some erros happends ["+checkResult+"]");
         }
     }
    //校验参数是否合法
    //如果这个方法返回的是空字符串,说明参数没有问题
    private static String checkParams(String[] args) {
         //定义一个返回结果的字符串
         String result = "";
         //通过加载args参数列表获取参数工具类对象
        QParameterTool tools = QParameterTool.fromArgs(args);
        //校验主题
        String  topic = tools.get(KEY_TOPIC);
        if(StringUtils.isEmpty(topic)){
          result = "agrs is Empty";
          return result;
        }
        //其他参数
        String source = tools.get(KEY_SOURCE);
        if(!SOURCE_PRODUCT.equalsIgnoreCase(source) && !SOURCE_PAY.equalsIgnoreCase(source)){
            result = "source is error .source must be ['product' or 'pay']";
            return  result;
        }
         Integer count = tools.getInt(KEY_COUNT);
        if (null == count){
            result = "count is empty";
            return result;
        }else{
            if (count > COUNT_MAX || count < COUNT_MIN){
                result = "count is unbound[" + COUNT_MIN + "~"+ COUNT_MAX +"]";
                return result;
            }
        }
        Integer sleep = tools.getInt(KEY_SLEEP);
        if( null == sleep){
            result = " sleep is empty";
            return  result;
        } else{
            if (sleep > SLEEP_MAX || sleep < SLEEP_MIN){
                result = "sleep is unbound[" + SLEEP_MIN + "~"+ SLEEP_MAX +"]";
                return result;
            }
        }
         return result;
     }
     //选择造数

    public static void chooseFun(String[] args){
         //通过加载args参数列表获取参数工具类对象
         QParameterTool tools = QParameterTool.fromArgs(args);
         String topic = tools.get(KEY_TOPIC);
         String source = tools.get(KEY_SOURCE);
         Integer count = tools.getInt(KEY_COUNT);
         Integer sleep = tools.getInt(KEY_SLEEP);
         if (SOURCE_PRODUCT.equalsIgnoreCase(source)){
             //创建关于产品的数据
             makeTravelProductData(topic,count,sleep);
         }else if(SOURCE_PAY.equalsIgnoreCase(source)){

         }
    }
    //创造产品订单数据
    public static void makeTravelProductData(String topic, Integer count, Integer sleep) {
      //发送序列化对象
        String dateFormatter = CommonConstant.FORMATTER_YYYYMMDDHHMMDD;
        String dayFormatter = CommonConstant.FORMATTER_YYYYMMDD;
        ChronoUnit chronoUnit = ChronoUnit.MINUTES;
        ChronoUnit daysChronoUnit = ChronoUnit.DAYS;
        //辅助数据
        //2.1地区信息
        Listregions = getRegions();
        //2.2 酒店信息
        Map pubs = getPubMappingPro();
        //2.3产品信息
        List productIDs = getProducts();

     }
    //旅游产品信息
    private static List getProducts() {
         if (CollectionUtils.isEmpty(products)){
             try{
                 List> productDatas = CSVUtil.readCSVFile(CSVUtil.PRODUCT_FILE,CSVUtil.QUOTE_COMMON);
                 if(CollectionUtils.isNotEmpty(productDatas)){
                     for (Map product:productDatas){
                         String productID = product.getOrDefault(PRODUCT_KEY_ID,"");
                         products.add(productID);
                     }
                 }
             }catch (Exception e){
                 log.error("TravelOrderHelper 's products errors :" + e);
             }
         }
         return  products;
    }
  //旅游产品与酒店映射关系
    private static Map getPubMappingPro() {
         //表示没有初始化
        if(MapUtils.isEmpty(proMappingPub)){
            try{
                List> pubDatas = CSVUtil.readCSVFile(CSVUtil.PUB_FILE,CSVUtil.QUOTE_COMMON);
                //校验
                if(CollectionUtils.isNotEmpty(pubDatas)){
                    //遍历
                    for (Map pub : pubDatas){
                        String pubId = pub.getOrDefault(PUB_KEY_ID,"");
                        String[] pps = pubId.split("\\|");
                        proMappingPub.put(pps[0],pubId);
                    }
                }
            }catch (Exception e){
                log.error("TravelOrderelper 's pub errors:" + e);
            }
        }
        return proMappingPub;
    }


    //获取到地区列表
    private static List getRegions() {
         //如果集合为空,就是第一次获取地区,需要初始化地区信息
         if (CollectionUtils.isEmpty(regions)){
             try{
             //就是一个路径,一个是分隔符
                List > regionsDatas = CSVUtil.readCSVFile(CSVUtil.REGION_FILE,CSVUtil.QUOTE_COMMON);
                //将regionDatas中Map封装到regionDo对象
             if(CollectionUtils.isNotEmpty(regionsDatas)){
                 for (Map region : regionsDatas){
                     RegionDo regionDo = new RegionDo();
                     regionDo.setRegionCity(region.getOrDefault(REGION_KEY_CITY, ""));
                     regionDo.setRegionCityDesc(region.getOrDefault(REGION_KEY_CITY_DESC, ""));
                     regionDo.setRegionCode(region.getOrDefault(REGION_KEY_CODE, ""));
                     regionDo.setRegionCodeDesc(region.getOrDefault(REGION_KEY_CODE_DESC, ""));
                     regionDo.setRegionProvince(region.getOrDefault(REGION_KEY_PROVINCE, ""));
                     regionDo.setRegionProvinceDesc(region.getOrDefault(REGION_KEY_PROVINCE_DESC, ""));
                     regions.add(regionDo);

                 }
             }

         }catch(Exception e){
             log.error("TravelOrderHelper 's region errors : " +e);
        }
         }
         return regions;
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/737375.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存