- 一.创建数据库表
- 创建用户地址数据表:
- 建立poi数据表:
- 创建结果输出表:
- 使用tunnel upload命令将下载好的residence.csv poi.csv文件导入,注意修改自己的文件储存路径。
- 分别查询用户地址数据表residence poi数据表数据。
- 二.创建Java项目
- GeoHashUtil.java文档
- DistanceUtil.Java计算地球上两点距离的方法
- LocationMapper.java实现MapReduce的Map处理逻辑:
- LocationReducer.java实现MapReduce的Reduce处理逻辑:
- LocationDriver.java用于运行ODPS的MapReduce的项目
- 三.分别创建3个测试文件:
- poi_schema_
- out_schema_
- residence_schema_
- 四.在odps客户端测试
- 1.导入jar包
- 2.在odps客户端执行命令:
- 3.用read out 命令查看表中生成的结果:
CREATE TABLE IF NOT EXISTS residence
(id STRING COMMENT '用户id',
lng DOUBLE COMMENT '用户所在位置经度',
lat DOUBLE COMMENT '用户所在位置纬度',
name STRING COMMENT '用户所在位置名称',
addr STRING COMMENT '用户详细地址'
);
建立poi数据表:
CREATE TABLE IF NOT EXISTS poi (
Id STRING COMMENT '店铺id',
lng DOUBLE COMMENT '店铺所在位置经度',
lat DOUBLE COMMENT '店铺所在位置纬度',
name STRING COMMENT '店铺名称',
addr STRING COMMENT '店铺详细地址',
type STRING COMMENT '店铺类型'
);
创建结果输出表:
CREATE TABLE IF NOT EXISTS out (
rid STRING COMMENT '用户id',
pid STRING COMMENT '店铺id',
distance DOUBLE COMMENT '距离'
);
使用tunnel upload命令将下载好的residence.csv poi.csv文件导入,注意修改自己的文件储存路径。
tunnel upload D:\IDEA\代码保存\项目三\residence.csv residence;
tunnel upload D:\IDEA\代码保存\项目三\poi.csv poi;
分别查询用户地址数据表residence poi数据表数据。
select * from residence limit 5;
select * from poi limit 5;
二.创建Java项目
创建一个Java项目,导入jar包,见项目三文档:
GeoHashUtil.java文档package com.aliyun.odps.lbs;
import ch.hsr.geohash.GeoHash;
import java.util.Arrays;
import java.util.List;
//@desc 计算周围8个区域的GeoHash编码*/
public class GeoHashUtil {
//@desc 获取某一点的GeoHash编码@param lat 纬度@param lng 经度@param PRECISION GeoHash编码长度@return*/
public static String getGeoHash(final double lat, final double lng, final int PRECISION){
return GeoHash.geoHashStringWithCharacterPrecision(lat, lng, PRECISION);
}
//@desc 获取某一地点及周围 8个区域总共9个点的GeoHash编码@param lat 纬度* @param lng 经度* @param PRECISION GeoHash编码长度* @return*/
public static List getGeoHashFor9(final double lat, final double lng, final int PRECISION){
// 纬度做二分类的次数GeoHash编码长度乘以5除以2
int latLength = PRECISION*5/2;
// 经度做二分类的次数GeoHash编码长度的奇偶有关系
// PRECISION*5%2 GeoHash编码长度乘以5除以2的余数等于0,即偶数,则经度做二分类的次数就等于纬度做二分类的次数
// 如果不等于0,即奇数,则经度做二分类的次数等于纬度做二分类的次数+1
int lngLength = PRECISION*5%2==0?latLength:latLength+1;
// 经度差和纬度差
// 最小纬度差
double minLat = 180/Math.pow(2, latLength);
// 最小经度差
double minLng = 360/Math.pow(2, lngLength);
// 上方三个区域的GeoHash编码
// 上方左边区域的GeoHash编码
String leftTop = getGeoHash(lat + minLat, lng -minLng, PRECISION);
// 上方中间区域的GeoHash编码
String midTop = getGeoHash(lat + minLat, lng, PRECISION);
// 上方右边区域的GeoHash编码
String rightTop = getGeoHash(lat+minLat, lng+minLng, PRECISION);
// 中间三个区域的GeoHash编码
String leftMid = getGeoHash(lat, lng -minLng, PRECISION);
String midMid = getGeoHash(lat, lng, PRECISION);
String rightMid = getGeoHash(lat, lng + minLng, PRECISION);
// 下方三个区域的GeoHash编码
String leftBot = getGeoHash(lat -minLat, lng -minLng, PRECISION);
String midBot = getGeoHash(lat -minLat, lng, PRECISION);
String rightBot = getGeoHash(lat -minLat, lng + minLng, PRECISION);
return Arrays.asList(leftTop, midTop, rightTop, leftMid, midMid, rightMid, leftBot, midBot, rightBot);
}
}
DistanceUtil.Java计算地球上两点距离的方法
package com.aliyun.odps.lbs;
/*** @desc 计算地球上两点之间的距离*/
public class DistanceUtil {
//地球半径:6378.393km*/
private static final double EARTH_RADIUS = 6378.393;
//计算地球上两点距离* @param lat1 第一个坐标纬度* @param lng1 第一个坐标经度* @param lat2 第二个坐标纬度* @param lng2 第二个坐标经度* @return*/
public static double getDistance(double lat1, double lng1, double lat2, double lng2){lat1 = rad(lat1);
lat2 = rad(lat2);lng1 = rad(lng1);lng2 = rad(lng2);
double s = EARTH_RADIUS * Math.acos(Math.cos(lng1-lng2)*Math.cos(lat1)*Math.cos(lat2)+Math.sin(lat1)*Math.sin(lat2));
return Math.round(s*1000);
}
//将角度转换成弧度* @param d* @return*/
private static double rad(double d){return d * Math.PI / 180.0;
}
}
LocationBean.java保存地址信息对象:
package com.aliyun.odps.lbs;
//位置Bean类*/
public class LocationBean {
// 纬度
private double lat;
// 经度
private double lng;
// getter setter方法
public double getLat() {
return lat;
}
public void setLat(double lat) {
this.lat = lat;
}
public double getLng() {
return lng;
}
public void setLng(double lng) {
this.lng = lng;
}
//*** 构造方法* @param lat* @param lng*/
public LocationBean(double lat, double lng) {this.lat = lat;this.lng = lng;
}
}
LocationMapper.java实现MapReduce的Map处理逻辑:
package com.aliyun.odps.lbs;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.Mapper;
import com.aliyun.odps.mapred.MapperBase;
import java.io.IOException;
import java.util.List;
//*** MapReduce的Map类LocationMapper继承MapperBase类*/
public class LocationMapper extends MapperBase {
// GeoHash编码长度
private static int PRECISION = 5;
// 用户地址表表名
private static String RESIDENCE= null;
// 商铺热点数据表表名
private static String POI = null;
@Override
public void map(long k, Record record, TaskContext context) throws IOException {
String id = record.getString(0);
// 获取记录中的经度和纬度值,为表中的第二列和第三列// 经度
double lng = record.getDouble(1);
// 纬度
double lat = record.getDouble(2);
Record key = null;
Record value = null;
value = context.createMapOutputValueRecord();
value.set(0, id);
value.set(1, lat);
value.set(2, lng);
//根据表的来源设置类型字段数据来自用户地址表
if(RESIDENCE.equalsIgnoreCase(context.getInputTableInfo().getTableName())){
List geoHashes = GeoHashUtil.getGeoHashFor9(lat, lng, PRECISION);
for(String hash : geoHashes){
// key
key = context.createMapOutputKeyRecord();
key.set(0, hash);
key.set(1, 1);
/*
// value
value = context.createMapOutputValueRecord();
value.set(0, id);
value.set(1, lat);
value.set(2, lng);
*/
context.write(key, value);
}
// 数据来自商铺热点数据表
} else if(POI.equalsIgnoreCase(context.getInputTableInfo().getTableName())){
String geoHash = GeoHashUtil.getGeoHash(lat, lng, PRECISION);
// key
key = context.createMapOutputKeyRecord();
key.set(0, geoHash);
key.set(1, 2);
/*
// value
value = context.createMapOutputValueRecord();
value.set(0, id);
value.set(1, lat);
value.set(2, lng);
*/
context.write(key, value);
}
}
@Override
public void setup(TaskContext context) throws IOException {
// 初始化变量
RESIDENCE= context.getJobConf().get("RESIDENCE");
POI = context.getJobConf().get("POI");
}
}
LocationReducer.java实现MapReduce的Reduce处理逻辑:
package com.aliyun.odps.lbs;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.Reducer;
import com.aliyun.odps.mapred.ReducerBase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
//MapReduce的Reduce类LocationReducer继educer承MRBase类
public class LocationReducer extends ReducerBase {
private Map, LocationBean> userMap = new HashMap<>();
// 变量,用来记录上一次的GeoHash编码
private String preHash = "";
@Override
public void reduce(Record key, Iterator values, TaskContext context) throws IOException {
// 判断当前GeoHash编码是否等于上一次的GeoHash编码
if(!key.getString(0).equals(preHash)){
// 不相等清空map缓冲
userMap.clear();
// 把当前GeoHash编码赋值给
preHash=preHash = key.getString(0);
}
// 判断是用户地址表据还是热点数据
if(key.getBigint(1).intValue() == 1){
// 读到的是用户数据
while(values.hasNext()){
Record record = values.next();
userMap.put(record.getString(0), new LocationBean(record.getDouble(1),
record.getDouble(2)));
}
}
if(key.getBigint(1).intValue() == 2){
// 读到的是热点数据
while(values.hasNext()){
Record record = values.next();
for(String k : userMap.keySet()){
LocationBean p = userMap.get(k);
// 距离
double d = DistanceUtil.getDistance(p.getLat(), p.getLng(),
record.getDouble(1), record.getDouble(2));
// 判断距离是否小于1000m
if(d < 1000){
Record result = context.createOutputRecord();
// 第一个字段用户id
result.set(0, k);
// 第二个字段热点店铺id
result.set(1,record.get(0));
// 第三个字段两点距离
result.set(2, d);
context.write(result);
}
}
}
}
}
}
LocationDriver.java用于运行ODPS的MapReduce的项目
package com.aliyun.odps.lbs;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
//主方法程序类
public class LocationDriver {
public static void main(String[] args){
JobConf job = new JobConf();
// Mapper类
job.setMapperClass(LocationMapper.class);
// Reducer类
job.setReducerClass(LocationReducer.class);
// 用户地址表和商铺热点表
job.set("RESIDENCE", args[0]);
job.set("POI", args[1]);
// Map的Key和Value的Schema
job.setMapOutputKeySchema(SchemaUtils.fromString("hash:string,type:bigint"));
job.setMapOutputValueSchema(SchemaUtils.fromString("id:string,lat:double,lng:double"));
// 设置分区字段Key里面的hash
job.setPartitionColumns(new String[]{"hash"});
// 设置输入和输出表
InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(),job);
InputUtils.addTable(TableInfo.builder().tableName(args[1]).build(),job);
OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(),job);
try {
JobClient.runJob(job);
} catch (Exception e){
e.printStackTrace();
}
}
}
三.分别创建3个测试文件:
poi_schema_
project=example_project
table=poi
columns=id:string,lng:double,lat:double,name:string,addr:string,type:string
out_schema_
project=example_project
table=out
columns=id:string,id:string,distance:double
residence_schema_
project=example_project
table=residence
columns=id:string,lng:double,lat:double,name:string,addr:string
四.在odps客户端测试
1.导入jar包
(记得修改文件地址)
create resource jar D:\IDEA\代码保存\实验三\lib\geohash-1.3.0.jar -f;
create resource jar D:\IDEA\代码保存\实验三\out\artifacts\lbs\lbs.jar -f;
2.在odps客户端执行命令:
(记得修改文件地址)
jar -resources lbs.jar,geohash-1.3.0.jar -classpath D:\IDEA\代码保存\实验三\out\artifacts\lbs\lbs.jar com.aliyun.odps.lbs.LocationDriver residence poi out;
3.用read out 命令查看表中生成的结果:
select a.rid,b.name,b.addr,c.name,c.addr,c.type,a.distance from out a join residence b on a.rid=b.id join poi c on c.id=a.pid order by a.rid ,a.distance asc limit 20;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)