- 本文关键词:
- 背景引入:
- 介绍IP库:
- 缩减IP信息:
- 代码实现:
- 单线程版本:
- 多线程版本:
- 如何ETL:
- 实现离线IP匹配:
- 注册ODPS资源:
- 实现IP配置的UDF:
- 注册函数:
- 调用IP解析方法:
- 阿里大数据计算引擎–【ODPS】又名【MaxCompute】
- 数据仓库,离线批次
- 解析IP地址
在建设大数据离线平台的过程中,会有一个ETL的过程,这个过程会将ods层数据进行批次的字段解析及复杂的逻辑处理。不同的公司有着不同的业务场景,所以也对应这不同的ETL逻辑,但是不管如何,有一个字段肯定是一致的,那么就是IP解析。
不管什么公司,都需要流量日志,流量日志中关于用户IP信息,后期是一定会进行解析的。
流量日志一般是用户在访问某平台或某产品所产生的日志,该日志存在诸多字段,IP就是其中的一个字段。前端页面埋码可能使用JS来做,也可能时候用SDK来做,但是不管用什么来做,如果在前端层面来实时解析IP地址所对应的国家、省份、城市、ISP信息等等,那么无疑会将隐患埋在前端,这样是不安全的,所以一般大数据离线平台会将IP解析步骤放在ETL过程中,用来降低隐患。并且提高前端页面的稳定性。
介绍IP库:IP库是某家公司或者某个产品所提供的,从网络上当然可以下载现成的IP库,但是里面可能有缺漏,不完整的IP库信息按道理对于数据仓库的地域维度统计是存在影响的,所以一般会从正当的公司或者途径去获取IP库。
一般的IP库如下:下面只是列举了几项字段,还有其他很多字段如国家代码等
761810432,761810687,埃及,马特鲁省,马特鲁,The Egyptian Company for Mobile Services (Mobinil)
761810688,761810943,埃及,吉萨省,Awsim,The Egyptian Company for Mobile Services (Mobinil)
761810944,761810991,埃及,The Egyptian Company for Mobile Services (Mobinil)
761810992,761811199,埃及,西部省,Basyun,The Egyptian Company for Mobile Services (Mobinil)
761811200,761811455,埃及,索哈杰省,贝利纳县,The Egyptian Company for Mobile Services (Mobinil)
761811456,761811711,埃及,布海拉省,道瓦尔,The Egyptian Company for Mobile Services (Mobinil)
761811712,761811967,埃及,该国贝尼苏韦夫省,贝尼苏韦夫,The Egyptian Company for Mobile Services (Mobinil)
761811968,761812479,埃及,吉萨省,吉萨,The Egyptian Company for Mobile Services (Mobinil)
761812480,761812735,埃及,The Egyptian Company for Mobile Services (Mobinil)
761812736,761812991,埃及,布海拉省,迪林加县,The Egyptian Company for Mobile Services (Mobinil)
761812992,761813247,埃及,开罗省,开罗,The Egyptian Company for Mobile Services (Mobinil)
761813248,761813503,埃及,该国贝尼苏韦夫省,贝尼苏韦夫,The Egyptian Company for Mobile Services (Mobinil)
761813504,761813759,埃及,吉萨省,吉萨,The Egyptian Company for Mobile Services (Mobinil)
761813760,761814015,埃及,东部省,Diyarb Najm,The Egyptian Company for Mobile Services (Mobinil)
但是正当的公司IP库一般会很大,这样就导致了在ETL的过程中会加倍增加资源的消耗。同时IP地址一般是一个范围,所以在匹配的时候无法做到join等式,这样无疑使IP解析的难度难上加难。
缩减IP信息:在一般的离线统计中,一般用不到过多的字段,可能对于有些公司,连用户的城市都不关注,只关注用户的国家和省份,那么这样一来,就可以将IP库进行缩减,减少字段的话对于存储大小和数据处理资源也有所缓解。
预处理:一般从正当途径获取的IP库肯定是符合正常逻辑的,比如ip地址的范围不冲突,一个IP存在于一个范围中,不能存在多个范围。
如果你的IP库发现了不合逻辑的IP地址划分,那么需要将处理好,获取正当的IP库才可能进行下面的 *** 作。
下面这段代码实现了将读取到的IP信息进行缩减,具体实现过程:
-
去掉IP库中的其他无用字段,只保留ip_start、ip_end、country_name、province_name,并按照ip_start升序
-
因为isp运营商的存在,所以可能导致之前的同一个国家同一个省份的IP地址是分割开的。比如:只是举例哈,不是真实地IP址,所以对于下面的数据,我们需要合并起来
ip_start ip_end country_name province_name isp_name 1234512347中国北京市移动 1234812350中国北京市联通 -
最后返回的结果,就是按照ip_start升序的缩减版的IP地址
ip_start ip_end country_name province_name isp_name 1234512350中国北京市该字段不要 -
逻辑大致如此,加下来上代码:
预处理后的IP文本示例:下面的代码主要就是处理这类型的文本
/Users/keino/Desktop/ippart1.csv 示例:
±-----------±-----------±-----------±-------------±--------------+
| rn | ip_start | ip_end | country_name | province_name |
±-----------±-----------±-----------±-------------±--------------+
| 1 | 16777216 | 16777471 | 澳大利亚 | |
| 2 | 16777472 | 16778239 | 中国 | 福建省 |
| 3 | 16778240 | 16779263 | 澳大利亚 | 维多利亚州 |
| 4 | 16779264 | 16781311 | 中国 | 广东省 |
| 5 | 16781312 | 16781567 | 日本 | 东京都 |
| 6 | 16781568 | 16782591 | 日本 | |
| 7 | 16782592 | 16782847 | 日本 | 东京都 |
| 8 | 16782848 | 16785407 | 日本 | |
| 9 | 16785408 | 16793599 | 中国 | 广东省 |
| 10 | 16793600 | 16797695 | 日本 | 广岛县 |
| 11 | 16797696 | 16797951 | 日本 | 鸟取县 |
| 12 | 16797952 | 16798207 | 日本 | 岛根县 |
| 13 | 16798208 | 16798719 | 日本 | 冈山县 |
| 14 | 16798720 | 16798975 | 日本 | 山口县 |
| 15 | 16798976 | 16799231 | 日本 | 岛根县 |
| 16 | 16799232 | 16799743 | 日本 | 冈山县 |
| 17 | 16799744 | 16799999 | 日本 | 岛根县 |
| 18 | 16800000 | 16800255 | 日本 | 山口县 |
package com.YunTest.ip.combineIpRegion; import java.io.*; import java.util.ArrayList; public class ConnConsecutiveIpRegion { public static void main(String[] args) throws IOException { // 读取本地文件 BufferedReader br = null; try { br = new BufferedReader(new FileReader("/Users/keino/Desktop/ippart1.csv")); } catch (FileNotFoundException e) { e.printStackTrace(); } // 合并逻辑 ArrayList多线程版本:myIpRegions = new ArrayList<>(); String record = ""; while ((record = br.readLine())!=null){ String[] info = record.split(","); String provinceTmp = ""; if(info.length<5){ provinceTmp = "empty"; }else{ provinceTmp = info[4].trim().length()>0?info[4]:"empty"; } MyIpRegion tmp = new MyIpRegion( Long.valueOf(info[1]) , Long.valueOf(info[2]) , info[3] , provinceTmp); // 单条处理逻辑 ArrayList loopList = new ArrayList<>(myIpRegions); int cnt = 0; //记录循环次数 用来修改myIpRegions对应位置的region boolean flag = false;//标志位,用来判断当前记录是否和其他记录有关联 for(MyIpRegion region:loopList){ if(tmp.ip_start==(region.ip_end+1) && tmp.province_name.equals(region.province_name) && tmp.country_name.equals(region.country_name)){ flag = true; myIpRegions.get(cnt).setIp_end(tmp.getIp_end()); } cnt++; } if(!flag){ //如果和其他的都不一样,那么就新生成一个IP范围对象 myIpRegions.add(tmp); } } // 数据处理完成,写到处理后结果表 BufferedWriter bw = null; try { bw = new BufferedWriter(new FileWriter("/Users/keino/documents/files/ippart1Res.csv")); for(MyIpRegion region:myIpRegions){ bw.write(region.toString()); bw.newline(); bw.flush(); } } catch (IOException e) { e.printStackTrace(); } // 释放 br.close(); bw.close(); } } class MyIpRegion{ Long ip_start; Long ip_end; String country_name; String province_name; public MyIpRegion(){} public MyIpRegion(Long ip_start, Long ip_end, String country_name, String province_name) { this.ip_start = ip_start; this.ip_end = ip_end; this.country_name = country_name; this.province_name = province_name; } public Long getIp_start() { return ip_start; } public void setIp_start(Long ip_start) { this.ip_start = ip_start; } public Long getIp_end() { return ip_end; } public void setIp_end(Long ip_end) { this.ip_end = ip_end; } public String getCountry_name() { return country_name; } public void setCountry_name(String country_name) { this.country_name = country_name; } public String getProvince_name() { return province_name; } public void setProvince_name(String province_name) { this.province_name = province_name; } @Override public String toString() { return ip_start+","+ip_end+","+country_name+","+province_name; } }
这里代码的各个方法我这里解释下:
- combineData:将指定文件夹下的全部合并到一个文件中
- readData:读取指定文件IP数据,按照单线程处理的数据大小,切分成多个文件
- Threadhandle:多线程类,处理逻辑和上面的单线程一致,就是按照数据量大小切分多个集合,将集合分配给不同的线程去执行
多线程整体实现思路:
一个IP文件可能很大,本地跑单线程速度太慢,不如将IP按照升序切分为多个文件,然后不同的线程去每个分配的文件中获取数据进行合并,合并后输出到各自线程的文件中,然后将这些文件继续合并,因为拆分了文件可能也导致IP断开,所以要再一次将”一次合并“后的文件合成大文件,进行”二次合并“,最后输出的就是全面的IP合并数据了。 readData-->Threadhandle-->combineData-->readData-->Threadhandle--->IP合并内容
package com.YunTest.ip.combineIpRegion; import java.io.*; import java.util.ArrayList; import java.util.linkedList; import java.util.concurrent.linkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadConnIpRegion { public static void main(String[] args) throws IOException, InterruptedException { //初始化数据 linkedList如何ETL:> arrayLists = readData_02(); Thread.sleep(60000); //开始多线程处理吧 ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 15, 1000, TimeUnit.SECONDS, new linkedBlockingQueue ()); for(int i=0;i> combineData() throws IOException { //本地文件分别存储到 10 个不同的ArrayList 当前数据共 1000000,每份100000 linkedList > myIpRegions = new linkedList >(); String Path = "/Users/keino/documents/files_02/Res_%s.csv"; int count = 0; BufferedWriter bw = null; bw = new BufferedWriter(new FileWriter(String.format("/Users/keino/documents/allFile/ip.csv"))); for(int i=0;i<39;i++){ BufferedReader br = new BufferedReader(new FileReader(String.format(Path,i))); String record = ""; while((record = br.readLine())!=null){ count++; String[] info = record.split(","); String provinceTmp = ""; if(info.length<4){ provinceTmp = "empty"; }else{ provinceTmp = info[3].trim().length()>0?info[3]:"empty"; } MyIpRegion tmp = new MyIpRegion( Long.valueOf(info[0]) , Long.valueOf(info[1]) , info[2] , provinceTmp); try { bw.write(tmp.toString()); bw.newline(); bw.flush(); } catch (IOException e) { e.printStackTrace(); } } } if(bw!=null) { try { bw.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println(count); return null; } public static linkedList > readData() throws IOException { BufferedReader br = new BufferedReader(new FileReader("/Users/keino/Desktop/ippart2.csv")); //本地文件分别存储到 10 个不同的ArrayList 当前数据共 1000000,每份100000 linkedList > myIpRegions = new linkedList >(); String record = ""; int gap = 100000; int count = 0; while((record = br.readLine())!=null){ String[] info = record.split(","); String provinceTmp = ""; if(info.length<5){ provinceTmp = "empty"; }else{ provinceTmp = info[4].trim().length()>0?info[4]:"empty"; } MyIpRegion tmp = new MyIpRegion( Long.valueOf(info[1]) , Long.valueOf(info[2]) , info[3] , provinceTmp); if(count%gap==0){ myIpRegions.add(new linkedList ()); } myIpRegions.get(count/gap).add(tmp); count++; } System.out.println(String.format("全部数据读取完毕---读取数据条数:%s",count)); System.out.println(String.format("当前读取后的链表大小= %s",myIpRegions.size())); return myIpRegions; } } class Threadhandle implements Runnable{ linkedList > arrayListArrayList; int pageIndex; public Threadhandle(linkedList > arrayListArrayList, int pageIndex) { this.arrayListArrayList = arrayListArrayList; this.pageIndex = pageIndex; } @Override public void run() { System.out.println("线程" + pageIndex + "开始"); // 和单线程处理逻辑一致 linkedList myIpRegions = arrayListArrayList.get(pageIndex); ArrayList ori = new ArrayList<>(); for(MyIpRegion tmp:myIpRegions){ ArrayList loopList = new ArrayList<>(ori); int cnt = 0; //记录循环次数 用来修改myIpRegions对应位置的region boolean flag = false;//标志位,用来判断当前记录是否和其他记录有关联 for(MyIpRegion region:loopList){ if(tmp.ip_start==(region.ip_end+1) && tmp.province_name.equals(region.province_name) && tmp.country_name.equals(region.country_name)){ flag = true; ori.get(cnt).setIp_end(tmp.getIp_end()); } cnt++; } if(!flag){ //如果和其他的都不一样,那么就新生成一个IP范围对象 ori.add(tmp); } } // 数据处理完成,写到处理后结果表 BufferedWriter bw = null; try { bw = new BufferedWriter(new FileWriter(String.format("/Users/keino/documents/files_02/Res_%s.csv",pageIndex))); for(MyIpRegion region:ori){ bw.write(region.toString()); bw.newline(); bw.flush(); } } catch (IOException e) { e.printStackTrace(); } if(bw!=null) { try { bw.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("线程" + pageIndex + "运行完毕"); } }
上文已经把IP地址缩小到极致了,无法再进行压缩。剩下的就是思考如何ETL进行获取IP地址。
1、join方式肯定不行,因为不是等式join,容易引起数据膨胀资源消耗太大,无法实现批次IP获取
2、转化为join?当前ip库是地址段,如果转化为具体的一条一条ip对应的信息,不成熟的思考也就是:255 x 255x 255x255 大概是42亿。。。这比一个公司一天的流量日志都大,更别说后期还有ipv6的地址,直接pass
3、使用UDF来做,分块进行IP的解析,这个可行,因为不会使IP库和流量日志膨胀,是有可能实现需求的
实现离线IP匹配: 注册ODPS资源:odps只允许通过tunnel来上传下载数据,使用udf的话,必然会下载存储在odps的IP合并数据【ip信息库存储在其他地方,就离线批处理的量,并发的实例同时读取,其他的存储组件根本承受不住,能承受住的也无法存储数据】,当用户日志量过大,odps生成的ETL实例就越多,这样下来,每一个实例都执行udf,加载一遍IP合并数据表,这样很可能打满tunnel资源,导致其他的程序崩溃甚至无法处理正常业务。
还好,odps提供了资源选项,如果将数据注册为资源的话,那么当实例读取这个资源,该资源会通过ODPS的内在机制去下发数据,不会通过tunnel去下载,这样就保证了业务可用及当前ODPS的tunnel资源不受影响。
# 注册ODPS资源 这里使用table资源 # add file实现IP配置的UDF:[as ] [comment ' '][-f]; # add archive [as ] [comment ' '][-f]; # add table [partition ( )] [as ] [comment ' '][-f]; # add py|jar [comment ' '][-f]; add table ip_combine_info as ip_combine_info;
- 读取资源:iterator = ctx.readResourceTable(“ip_combine_info”).iterator();
- IP全局排序,使用二分查找即可获取IP信息
package com.udf.IP.GetIpViaResource; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDF; import com.aliyun.odps.udf.UDFException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; public class GetIpInfoViaOdpsResource_20211221 extends UDF { public static ArrayList注册函数:ipInfos = new ArrayList<>(); public void setup(ExecutionContext ctx) throws UDFException { Iterator
udf写好了,那么需要打包jar上传到odps并在odps上注册ip解析函数:
- ⚠️上传后,注册函数 要注意这里不仅要申明上传的jar包,还要申明在udf中引用的资源,否则函数调用会报错ctx读取的资源不存在
# 上传jar包 这里填写本地的jar路径即可 add jar /Users/keino/ip_parse.jar; # 上传后,注册函数 要注意这里不仅要申明上传的jar包,还要申明在udf中引用的资源,否则函数调用会报错ctx读取的资源不存在 create function ip_parse as 'package com.udf.IP.GetIpViaResource.GetIpInfoViaOdpsResource_20211221' using 'ip_parse.jar, ip_combine_info';调用IP解析方法:
select *,ip_parse(ip) ip_info from ods_useer_log where ds = '20210701' limit 1;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)