MapReduce并行处理csv文件,将船舶数据划分子轨迹

MapReduce并行处理csv文件,将船舶数据划分子轨迹,第1张

MapReduce并行处理csv文件,将船舶数据划分子轨迹 bean对象

因为在划分子轨迹中,主要使用的字段是mmsi号、位置、速度、时间,以及划分的特征点、子轨迹段,所以只需要这几个属性即可,重写toString方法,重写序列化和反序列化方法

// bean类
class SubTrajectorBean implements Writable{
	private String MMSI;
	private Double Lat_d;
	private Double Lon_d;
	private Long unixTime;
	private Integer label = -1;
	private String subTrajector = null;
	
	public String toString(){
		return MMSI + "," + Lat_d + "," + Lon_d + "," + unixTime + "," + label + "," + subTrajector;	
	}

重写序列化和反序列化方法

// 序列化方法
	@override
	public void write(DataOutput dataOutput) throw IOException{
		dataOutput.writeUTF(MMSI);
		dataOutput.writeDouble(Lat_d);
		dataOutput.writeDouble(Lon_d);
		dataOutput.writeLong(unixTime);
		dataOutput.writeInt(label);
		dataOutput.writeUTF(subTrajector);
	}
	
	// 反序列化方法
	@override
	public void readFields(DataInput dataInput) throw IOException{
		this.MMSI = dataInput.readUTF();
		this.Lat_d = dataInput.readDouble();
		this.Lon_d = dataInput.readDouble();
		this.unixTime = dataInput.readLong();
		this.label = dataInput.readInt();
		this.subTrajector = dataInput.readUTF();
	}
Map阶段

map阶段主要是过滤速度阈值,将速度小于3kn的数据点看作抛锚点过滤

// Mapper
public class SubTrajectorMapper extend Mapper{
	
	// 输出key、value
	private Text outK = new Text();
	private SubTrajectorBean outV = new SubTrajectorBean();
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
		
		// 转为字符串
		String[] comments = value.toString();
		// 判断速度是否大于3Kn,大于输出,否则过滤
		if(comments[5] > 3){
			String MMSI = comments[1];
			Double Lat_d = Double.parseDouble(comment[7]);
			Double Lon_d = Double.parseDouble(comment[8]);
			Double unixTime = Long.parseLong(comments[9]);
			
			// 封装bean对象
			outV.setMMMSI(MMSI);
			outV.setLat_d(Lat_d);
			outV.setLon_d(Lon_d);
			outV.setUnixTime(unixTime);
			
			// 封装Text对象
			outK.set(MMSI);
			
			// 写出context
			context.write(outK, outV);
		}
	}
}
Reduce阶段

reduce阶段需要将过滤的数据按照mmsi排序,标记特征点,按照特征点划分子轨迹段

// Reducer
public class SubTrajectorReducer extends Reducer{
	// 重写reduce方法
	public void reduce(Text key, Iterable values, Context context){
		
		// 将同一MMSI数据放到一个新列表中
		List trajectorList = new ArrayList<>(1000);
		for(SubTrajectorBean value: values){
			trajectorList.add(Utils.getNewBean(value));
		}
		
		// 进行处理
		// 二维数组
		List[] result = new List()[];
		// 二维数组索引和子轨迹索引和特征点
		Integer index = 0, sub = 1, trait = 0;
		for(int i = 0; i < trajectorList.length()-3; i++){
			// 如果等于一,则是第一个,将其label设置为1
			if(trait == 0) {
				trajectorList.get(i).setLabel(1);				
				trajectorList.get(i).setLabel(1);
			
			}
			// 判断是否时间超限
			// 如果超限,将该点的label改为1
			Double = time = Math.abs(trajectorList.get(i).getUnixTime() - trajectorList.get(i+1).getUnixTime());
			if(Double > 360){
				trajectorList.get(i).setLabel(1);
				// 判断子轨迹的个数是否大于10
				if (trait > 10){
					index += 1;
					sub += 1;
					trait = 0;
				}else {
					result[index].clear();
				}
			}
			// 如果不超限,判断TF特征点
			else{
				SubTrajectorBean stb1 = trajectorList.get(i);
				SubTrajectorBean stb2 = trajectorList.get(i+1);
				SubTrajectorBean stb3 = trajectorList.get(i+2);
				SubTrajectorBean stb4 = trajectorList.get(i+3);
				Double T12 = Utils.getT(stb1, stb2, stb3);
				Double T23 = Utils.getT(stb2, stb3, stb4);
				if((T12 * T23) < 0){
					// 为i + 2赋值label
					trajectorList.get(i+2).setLabel(1);
					// 将i+1,i+2加入result
					// 拼接子轨迹编号
					String st = value.getMMSI() + sub;
					trajectorList.get(i+1).setSubTrajector(st);
					trajectorList.get(i+2).setSubTrajector(st);
					result[index].append(trajectorList.get(i+1));
					result[index].append(trajectorList.get(i+2));
					// 判断子轨迹的个数是否大于10个数
					if (trait > 10){
						index += 1;
						sub += 1;
						trait = 0;
					}else {
						result[index].clear();
					}
			}
			
			// 拼接子轨迹编号
			String st = value.getMMSI() + sub;
			trajectorList.get(i).setSubTrajector(st);
			// 添加到二维列表中
			result[index].append(trajectorList.get(i));
			
			// 写出数据
			for(List values: result){
				for(SubTrajectorBean value: values){
					context.write(NullWritable, value);
				}
			}	
		}
	}
}
Utils工具类

在处理的过程中,为了解耦,所以将个别方法单独拿出来设置成了工具类

// 工具类Utils
public class Utils{
	
	// 得到一个新的bean对象
	public static SubTrajectorBean getNewBean(SubTrajectorBean stb){
		SubTrajectorBean bean = new SubTrajectorBean();
		bean.setMMSI(stb.getMMSI());
		bean.setLat_d(stb.getLat_d());
		bean.setLon_d(stb.getLon_d());
		bean.setUnixTime(stb.getUnixTime());
		bean.setLabel(stb.getLabel());
		bean.setSubTrajector(stb.getSubTrajector());
	}
	
	// 曲线边缘法
	public static Double getT(SubTrajectorBean stb1,SubTrajectorBean stb2, SubTrajectorBean stb3){
		
		// 计算Tmn
		Double T = (stb2.getLat_d - stb1.getLat_d)(stb3.getLon_d - stb1.getLon_d) + (stb3.getLat_d - stb1.getLat_d)(stb2.getLon_d - stb1.getLon_d);
		return T;
	}
}
Driver类

Driver类就是典型的八股文形式,关联map和redece,设置key、value,设置路径,提交作业

// Driver类
public class SubTrajectorDriver{
	public static void main(String[] args){
		// 1.获取job对象
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		// 2.关联Driver类
		job.setJarByClass(SubTrajectorDriver.class);
		
		// 3.关联Mapper和Reducer类
		job.setMapperClass(SubTrajectorMapper.class);
		job.setReducerClass(SubTrajectorReducer.class);
		
		// 4.设置Map的输出key/value
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(SubTrajectorBean.class);
		
		// 5.设置最终的输出key/value
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(SubTrajectorBean.class);
		
		// 6.设置输入输出路径
		FileInputFormat.setInputPath(job, new Path("inputPath"));
		FileOutputFormat.setOutputPath(job, new Path("outputPath"));
		
		// 7.提交job
		boolean result = job.waitForCompletion(true);
		System.exit(result? 0: 1);
	}
}

上述代码就是大概的子轨迹提取过程的MapReduce实现,因为疫情原因,本人封闭不让去实验室,所以机器的限制并不能真是运行该代码,代码编写也是在xp系统的文本中靠感觉编写,但是具体思路完全符合研究逻辑,代码虽不能保证完全正确统一,但是也在编写中也十分注意格式和语法,如有代码错误之处,还请指出。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5638707.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存