因为在划分子轨迹中,主要使用的字段是mmsi号、位置、速度、时间,以及划分的特征点、子轨迹段,所以只需要这几个属性即可,重写toString方法,重写序列化和反序列化方法
// bean类 class SubTrajectorBean implements Writable{ private String MMSI; private Double Lat_d; private Double Lon_d; private Long unixTime; private Integer label = -1; private String subTrajector = null; public String toString(){ return MMSI + "," + Lat_d + "," + Lon_d + "," + unixTime + "," + label + "," + subTrajector; }
重写序列化和反序列化方法
// 序列化方法 @override public void write(DataOutput dataOutput) throw IOException{ dataOutput.writeUTF(MMSI); dataOutput.writeDouble(Lat_d); dataOutput.writeDouble(Lon_d); dataOutput.writeLong(unixTime); dataOutput.writeInt(label); dataOutput.writeUTF(subTrajector); } // 反序列化方法 @override public void readFields(DataInput dataInput) throw IOException{ this.MMSI = dataInput.readUTF(); this.Lat_d = dataInput.readDouble(); this.Lon_d = dataInput.readDouble(); this.unixTime = dataInput.readLong(); this.label = dataInput.readInt(); this.subTrajector = dataInput.readUTF(); }Map阶段
map阶段主要是过滤速度阈值,将速度小于3kn的数据点看作抛锚点过滤
// Mapper public class SubTrajectorMapper extend MapperReduce阶段{ // 输出key、value private Text outK = new Text(); private SubTrajectorBean outV = new SubTrajectorBean(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ // 转为字符串 String[] comments = value.toString(); // 判断速度是否大于3Kn,大于输出,否则过滤 if(comments[5] > 3){ String MMSI = comments[1]; Double Lat_d = Double.parseDouble(comment[7]); Double Lon_d = Double.parseDouble(comment[8]); Double unixTime = Long.parseLong(comments[9]); // 封装bean对象 outV.setMMMSI(MMSI); outV.setLat_d(Lat_d); outV.setLon_d(Lon_d); outV.setUnixTime(unixTime); // 封装Text对象 outK.set(MMSI); // 写出context context.write(outK, outV); } } }
reduce阶段需要将过滤的数据按照mmsi排序,标记特征点,按照特征点划分子轨迹段
// Reducer public class SubTrajectorReducer extends ReducerUtils工具类{ // 重写reduce方法 public void reduce(Text key, Iterable values, Context context){ // 将同一MMSI数据放到一个新列表中 List trajectorList = new ArrayList<>(1000); for(SubTrajectorBean value: values){ trajectorList.add(Utils.getNewBean(value)); } // 进行处理 // 二维数组 List[] result = new List()[]; // 二维数组索引和子轨迹索引和特征点 Integer index = 0, sub = 1, trait = 0; for(int i = 0; i < trajectorList.length()-3; i++){ // 如果等于一,则是第一个,将其label设置为1 if(trait == 0) { trajectorList.get(i).setLabel(1); trajectorList.get(i).setLabel(1); } // 判断是否时间超限 // 如果超限,将该点的label改为1 Double = time = Math.abs(trajectorList.get(i).getUnixTime() - trajectorList.get(i+1).getUnixTime()); if(Double > 360){ trajectorList.get(i).setLabel(1); // 判断子轨迹的个数是否大于10 if (trait > 10){ index += 1; sub += 1; trait = 0; }else { result[index].clear(); } } // 如果不超限,判断TF特征点 else{ SubTrajectorBean stb1 = trajectorList.get(i); SubTrajectorBean stb2 = trajectorList.get(i+1); SubTrajectorBean stb3 = trajectorList.get(i+2); SubTrajectorBean stb4 = trajectorList.get(i+3); Double T12 = Utils.getT(stb1, stb2, stb3); Double T23 = Utils.getT(stb2, stb3, stb4); if((T12 * T23) < 0){ // 为i + 2赋值label trajectorList.get(i+2).setLabel(1); // 将i+1,i+2加入result // 拼接子轨迹编号 String st = value.getMMSI() + sub; trajectorList.get(i+1).setSubTrajector(st); trajectorList.get(i+2).setSubTrajector(st); result[index].append(trajectorList.get(i+1)); result[index].append(trajectorList.get(i+2)); // 判断子轨迹的个数是否大于10个数 if (trait > 10){ index += 1; sub += 1; trait = 0; }else { result[index].clear(); } } // 拼接子轨迹编号 String st = value.getMMSI() + sub; trajectorList.get(i).setSubTrajector(st); // 添加到二维列表中 result[index].append(trajectorList.get(i)); // 写出数据 for(List values: result){ for(SubTrajectorBean value: values){ context.write(NullWritable, value); } } } } }
在处理的过程中,为了解耦,所以将个别方法单独拿出来设置成了工具类
// 工具类Utils public class Utils{ // 得到一个新的bean对象 public static SubTrajectorBean getNewBean(SubTrajectorBean stb){ SubTrajectorBean bean = new SubTrajectorBean(); bean.setMMSI(stb.getMMSI()); bean.setLat_d(stb.getLat_d()); bean.setLon_d(stb.getLon_d()); bean.setUnixTime(stb.getUnixTime()); bean.setLabel(stb.getLabel()); bean.setSubTrajector(stb.getSubTrajector()); } // 曲线边缘法 public static Double getT(SubTrajectorBean stb1,SubTrajectorBean stb2, SubTrajectorBean stb3){ // 计算Tmn Double T = (stb2.getLat_d - stb1.getLat_d)(stb3.getLon_d - stb1.getLon_d) + (stb3.getLat_d - stb1.getLat_d)(stb2.getLon_d - stb1.getLon_d); return T; } }Driver类
Driver类就是典型的八股文形式,关联map和redece,设置key、value,设置路径,提交作业
// Driver类 public class SubTrajectorDriver{ public static void main(String[] args){ // 1.获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2.关联Driver类 job.setJarByClass(SubTrajectorDriver.class); // 3.关联Mapper和Reducer类 job.setMapperClass(SubTrajectorMapper.class); job.setReducerClass(SubTrajectorReducer.class); // 4.设置Map的输出key/value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SubTrajectorBean.class); // 5.设置最终的输出key/value job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(SubTrajectorBean.class); // 6.设置输入输出路径 FileInputFormat.setInputPath(job, new Path("inputPath")); FileOutputFormat.setOutputPath(job, new Path("outputPath")); // 7.提交job boolean result = job.waitForCompletion(true); System.exit(result? 0: 1); } }
上述代码就是大概的子轨迹提取过程的MapReduce实现,因为疫情原因,本人封闭不让去实验室,所以机器的限制并不能真是运行该代码,代码编写也是在xp系统的文本中靠感觉编写,但是具体思路完全符合研究逻辑,代码虽不能保证完全正确统一,但是也在编写中也十分注意格式和语法,如有代码错误之处,还请指出。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)