阿里云odpscmd热点店铺搜索案例

阿里云odpscmd热点店铺搜索案例,第1张

阿里云odpscmd热点店铺搜索案例(仅代码)
  • 一.创建数据库表
    • 创建用户地址数据表:
    • 建立poi数据表:
    • 创建结果输出表:
    • 使用tunnel upload命令将下载好的residence.csv poi.csv文件导入,注意修改自己的文件储存路径。


    • 分别查询用户地址数据表residence poi数据表数据。


  • 二.创建Java项目
    • GeoHashUtil.java文档
    • DistanceUtil.Java计算地球上两点距离的方法
    • LocationMapper.java实现MapReduce的Map处理逻辑:
    • LocationReducer.java实现MapReduce的Reduce处理逻辑:
    • LocationDriver.java用于运行ODPS的MapReduce的项目
  • 三.分别创建3个测试文件:
    • poi_schema_
    • out_schema_
    • residence_schema_
  • 四.在odps客户端测试
    • 1.导入jar包
    • 2.在odps客户端执行命令:
    • 3.用read out 命令查看表中生成的结果:

一.创建数据库表 创建用户地址数据表:
CREATE TABLE IF NOT EXISTS residence 
			(id STRING COMMENT '用户id',
			lng DOUBLE COMMENT '用户所在位置经度',
			lat DOUBLE COMMENT '用户所在位置纬度',
			name STRING COMMENT '用户所在位置名称',
			addr STRING COMMENT '用户详细地址'
			);
建立poi数据表:
CREATE TABLE IF NOT EXISTS poi (
			Id STRING COMMENT '店铺id',
			lng DOUBLE COMMENT '店铺所在位置经度',
			lat DOUBLE COMMENT '店铺所在位置纬度',
			name STRING COMMENT '店铺名称',
			addr STRING COMMENT '店铺详细地址',
			type STRING COMMENT '店铺类型'
);
创建结果输出表:
CREATE TABLE IF NOT EXISTS out (
			rid STRING COMMENT '用户id',
			pid STRING COMMENT '店铺id',
			distance DOUBLE COMMENT '距离'           
);
使用tunnel upload命令将下载好的residence.csv poi.csv文件导入,注意修改自己的文件储存路径。


tunnel upload D:\IDEA\代码保存\项目三\residence.csv residence;
tunnel upload D:\IDEA\代码保存\项目三\poi.csv poi;
分别查询用户地址数据表residence poi数据表数据。


select * from residence limit 5;
select * from poi limit 5;
二.创建Java项目

创建一个Java项目,导入jar包,见项目三文档:

GeoHashUtil.java文档
package com.aliyun.odps.lbs;
import ch.hsr.geohash.GeoHash;

import java.util.Arrays;
import java.util.List;

//@desc 计算周围8个区域的GeoHash编码*/
public class GeoHashUtil {
    //@desc 获取某一点的GeoHash编码@param lat 纬度@param lng 经度@param PRECISION GeoHash编码长度@return*/
    public static String getGeoHash(final double lat, final double lng, final int PRECISION){
        return GeoHash.geoHashStringWithCharacterPrecision(lat, lng, PRECISION);
    }
//@desc 获取某一地点及周围 8个区域总共9个点的GeoHash编码@param lat 纬度* @param lng 经度* @param PRECISION GeoHash编码长度* @return*/
    public static List getGeoHashFor9(final double lat, final double lng, final int PRECISION){
        //   纬度做二分类的次数GeoHash编码长度乘以5除以2
        int latLength = PRECISION*5/2;
        
        // 经度做二分类的次数GeoHash编码长度的奇偶有关系
        // PRECISION*5%2 GeoHash编码长度乘以5除以2的余数等于0,即偶数,则经度做二分类的次数就等于纬度做二分类的次数
        // 如果不等于0,即奇数,则经度做二分类的次数等于纬度做二分类的次数+1
        int lngLength = PRECISION*5%2==0?latLength:latLength+1;
        
        // 经度差和纬度差
        // 最小纬度差
        double minLat = 180/Math.pow(2, latLength);
        // 最小经度差
        double minLng = 360/Math.pow(2, lngLength);
        // 上方三个区域的GeoHash编码
        // 上方左边区域的GeoHash编码
        String leftTop = getGeoHash(lat + minLat, lng -minLng, PRECISION);
        
        // 上方中间区域的GeoHash编码
        String midTop = getGeoHash(lat + minLat, lng, PRECISION);
        
        // 上方右边区域的GeoHash编码
        String rightTop = getGeoHash(lat+minLat, lng+minLng, PRECISION);
        
        // 中间三个区域的GeoHash编码
        String leftMid = getGeoHash(lat, lng -minLng, PRECISION);
        String midMid = getGeoHash(lat, lng, PRECISION);
        String rightMid = getGeoHash(lat, lng + minLng, PRECISION);
        
        // 下方三个区域的GeoHash编码
        String leftBot = getGeoHash(lat -minLat, lng -minLng, PRECISION);
        String midBot = getGeoHash(lat -minLat, lng, PRECISION);
        String rightBot = getGeoHash(lat -minLat, lng + minLng, PRECISION);
        return Arrays.asList(leftTop, midTop, rightTop, leftMid, midMid, rightMid, leftBot, midBot, rightBot);
    }
    }
DistanceUtil.Java计算地球上两点距离的方法
package com.aliyun.odps.lbs;
/*** @desc 计算地球上两点之间的距离*/
public class DistanceUtil {
    //地球半径:6378.393km*/
    private static final double EARTH_RADIUS = 6378.393;
    //计算地球上两点距离* @param lat1 第一个坐标纬度* @param lng1 第一个坐标经度* @param lat2 第二个坐标纬度* @param lng2 第二个坐标经度* @return*/
    public static double getDistance(double lat1, double lng1, double lat2, double lng2){lat1 = rad(lat1);
    lat2 = rad(lat2);lng1 = rad(lng1);lng2 = rad(lng2);
    double s = EARTH_RADIUS * Math.acos(Math.cos(lng1-lng2)*Math.cos(lat1)*Math.cos(lat2)+Math.sin(lat1)*Math.sin(lat2));
    return Math.round(s*1000);
    }
    //将角度转换成弧度* @param d* @return*/
    private static double rad(double d){return d * Math.PI / 180.0;
    }
}

LocationBean.java保存地址信息对象:

package com.aliyun.odps.lbs;
//位置Bean类*/
public class LocationBean {
    // 纬度
    private double lat;
    // 经度
    private double lng;
    // getter setter方法
    public double getLat() {
        return lat;
    }
    public void setLat(double lat) {
        this.lat = lat;
    }
    public double getLng() {
        return lng;
    }
    public void setLng(double lng) {
        this.lng = lng;
    }
    //*** 构造方法* @param lat* @param lng*/
    public LocationBean(double lat, double lng) {this.lat = lat;this.lng = lng;
    }
}

LocationMapper.java实现MapReduce的Map处理逻辑:
package com.aliyun.odps.lbs;

import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.Mapper;
import com.aliyun.odps.mapred.MapperBase;
import java.io.IOException;
import java.util.List;
//*** MapReduce的Map类LocationMapper继承MapperBase类*/
public class LocationMapper extends MapperBase {
    // GeoHash编码长度
    private static int PRECISION = 5;
    // 用户地址表表名
    private static String RESIDENCE= null;
    // 商铺热点数据表表名
    private static String POI = null;
    @Override
    public void map(long k, Record record, TaskContext context) throws IOException {
        String id = record.getString(0);
    // 获取记录中的经度和纬度值,为表中的第二列和第三列// 经度
        double lng = record.getDouble(1);
        // 纬度
        double lat = record.getDouble(2);
        Record key = null;
        Record value = null;
        value = context.createMapOutputValueRecord();
        value.set(0, id);
        value.set(1, lat);
        value.set(2, lng);
        //根据表的来源设置类型字段数据来自用户地址表
        if(RESIDENCE.equalsIgnoreCase(context.getInputTableInfo().getTableName())){
            List geoHashes = GeoHashUtil.getGeoHashFor9(lat, lng, PRECISION);
        for(String hash : geoHashes){
            // key
            key = context.createMapOutputKeyRecord();
            key.set(0, hash);
            key.set(1, 1);

            /*
            // value
            value = context.createMapOutputValueRecord();
            value.set(0, id);
            value.set(1, lat);
            value.set(2, lng);
             */
            context.write(key, value);
        }
// 数据来自商铺热点数据表
} else if(POI.equalsIgnoreCase(context.getInputTableInfo().getTableName())){
            String geoHash = GeoHashUtil.getGeoHash(lat, lng, PRECISION);
            // key
            key = context.createMapOutputKeyRecord();
            key.set(0, geoHash);
            key.set(1, 2);
            /*
            // value
            value = context.createMapOutputValueRecord();
            value.set(0, id);
            value.set(1, lat);
            value.set(2, lng);
*/
            context.write(key, value);
        }
    }
    @Override
    public void setup(TaskContext context) throws IOException {
        // 初始化变量
        RESIDENCE= context.getJobConf().get("RESIDENCE");
        POI = context.getJobConf().get("POI");
    }
}
LocationReducer.java实现MapReduce的Reduce处理逻辑:
package com.aliyun.odps.lbs;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.Reducer;
import com.aliyun.odps.mapred.ReducerBase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
//MapReduce的Reduce类LocationReducer继educer承MRBase类
public class LocationReducer extends ReducerBase {
    private Map, LocationBean> userMap = new HashMap<>();
    // 变量,用来记录上一次的GeoHash编码
    private String preHash = "";
    @Override
    public void reduce(Record key, Iterator values, TaskContext context) throws IOException {
        // 判断当前GeoHash编码是否等于上一次的GeoHash编码
        if(!key.getString(0).equals(preHash)){
            // 不相等清空map缓冲
            userMap.clear();
            // 把当前GeoHash编码赋值给
            preHash=preHash = key.getString(0);
        }
        // 判断是用户地址表据还是热点数据
        if(key.getBigint(1).intValue() == 1){
            // 读到的是用户数据
            while(values.hasNext()){
                Record record = values.next();
                userMap.put(record.getString(0), new LocationBean(record.getDouble(1),
                        record.getDouble(2)));
            }
        }
        if(key.getBigint(1).intValue() == 2){
            // 读到的是热点数据
            while(values.hasNext()){
                Record record = values.next();
            for(String k : userMap.keySet()){
                LocationBean p = userMap.get(k);
                // 距离
                double d = DistanceUtil.getDistance(p.getLat(), p.getLng(),
                        record.getDouble(1), record.getDouble(2));
                // 判断距离是否小于1000m
                if(d < 1000){
                    Record result = context.createOutputRecord();
                    // 第一个字段用户id
                    result.set(0, k);
                    // 第二个字段热点店铺id
                    result.set(1,record.get(0));
                    // 第三个字段两点距离
                    result.set(2, d);
                    context.write(result);
                    }
                } 
            }
        }
    }
}

LocationDriver.java用于运行ODPS的MapReduce的项目
package com.aliyun.odps.lbs;

import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
//主方法程序类
public class LocationDriver {
    public static void main(String[] args){
        JobConf job = new JobConf();
        // Mapper类
        job.setMapperClass(LocationMapper.class);
        // Reducer类
        job.setReducerClass(LocationReducer.class);
        // 用户地址表和商铺热点表
        job.set("RESIDENCE", args[0]);
        job.set("POI", args[1]);
        // Map的Key和Value的Schema
        job.setMapOutputKeySchema(SchemaUtils.fromString("hash:string,type:bigint"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("id:string,lat:double,lng:double"));
        // 设置分区字段Key里面的hash
        job.setPartitionColumns(new String[]{"hash"});
        // 设置输入和输出表
        InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(),job);
        InputUtils.addTable(TableInfo.builder().tableName(args[1]).build(),job);
        OutputUtils.addTable(TableInfo.builder().tableName(args[2]).build(),job);
        try {
            JobClient.runJob(job);
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}
三.分别创建3个测试文件: poi_schema_
project=example_project
table=poi
columns=id:string,lng:double,lat:double,name:string,addr:string,type:string
out_schema_
project=example_project
table=out
columns=id:string,id:string,distance:double
residence_schema_
project=example_project
table=residence
columns=id:string,lng:double,lat:double,name:string,addr:string
四.在odps客户端测试 1.导入jar包

(记得修改文件地址)

create resource jar D:\IDEA\代码保存\实验三\lib\geohash-1.3.0.jar -f;
create resource jar D:\IDEA\代码保存\实验三\out\artifacts\lbs\lbs.jar -f;
2.在odps客户端执行命令:

(记得修改文件地址)

jar -resources lbs.jar,geohash-1.3.0.jar -classpath D:\IDEA\代码保存\实验三\out\artifacts\lbs\lbs.jar com.aliyun.odps.lbs.LocationDriver residence poi out;
3.用read out 命令查看表中生成的结果:
select a.rid,b.name,b.addr,c.name,c.addr,c.type,a.distance from out a join residence b on a.rid=b.id join poi c on c.id=a.pid order by a.rid ,a.distance asc limit 20;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/589780.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-13
下一篇 2022-04-13

发表评论

登录后才能评论

评论列表(0条)

保存