二次改正:修正时间问题,发现2021年11月之后的文件解压后文件数目变成45个,遂将check逻辑进行修改
文章目录
- 前言
- 一、安装环境及所需库
- 二、针对参考博客的改进
- 1.不重要的改进
- 2.对所需要求进行改进
- 3.对所需要求进行改进后的代码
- 三、代码运行情况
前言
最近下载了很多哨兵影像,由于事先以为需要一个一个解压太费时间(不是懒)遂想写个代码进行批量解压,后来同学告知实际上目前软件,如winrar、7zip等是可以批量解压的,emmm,那还是记录下学习的过程吧。
代码可以实现的功能:
- 对选择解压目录下的哨兵压缩数据进行解压
- 检核数据是否正确(一般来说,解压后哨兵数据存储大小会大于7GB,文件数目为38个)
参考博客:https://blog.csdn.net/qq_40695642/article/details/100170316,但是博主用的Python2.X,本脚本用的是Python3.9,因此需要对一些内容需要改进
一、安装环境及所需库
安装环境以及环境配置可以看我上一篇文章https://blog.csdn.net/qq_44932630/article/details/124202144,实际这次没有额外库安装,主要是调用python内置库包。
二、针对参考博客的改进 1.不重要的改进原博客中使用如下代码防止中文乱码,实际上我们用不太到,因为哨兵数据本来就是英文+数字组成的,因此这一块不是很重要。
import sys
reload(sys)
sys.setdefaultencoding('gbk') #如遇到无法识别中文而报错使用
但是加上后就会报错,上网查询原因大致为版本不匹配,因此将其改为:
import importlib
importlib.reload(sys)#如遇到无法识别中文而报错使用
2.对所需要求进行改进
在改进前大致梳理了一下自己的需求
- 能够检核解压后文件对不对,查阅手动解压的文件后发现,哨兵数据解压后文件夹大小大都大于7GB,而且文件数量为固定38个。如果有异况还请同学们在评论区指正。
检核函数(Check_size)如下:
def Check_size(path):
size = 0.0 # 记录大小
count = 0 # 记录数量
for root, dirs, files in os.walk(path):
size += sum([os.path.getsize(os.path.join(root, file)) for file in files])
for each in files:
count += 1
size = round(size/1024/1024/1024,2) #小数点后两位
if size >7 and count == 38:
print("此文件夹大小为%sGB,文件个数为%d个,合格"%(size,count))
else:
print("此文件解压有误,大小为%sGB"%size)
- 可以对解压路径进行查询,如果已经部分数据已经解压过了就可以直接跳过了。
查询代码内嵌在解压函数中,代码如下:
def unzip_file(zip_file_name,destination_path):
try:
start = time.time()
Doned = os.listdir(destination_path) #已经解压过的文件
SplitPre= re.split(r'[/,.,\,\s ]\s*', zip_file_name)[-2] #将准备解压的文件名提取出
SplitPre_C = SplitPre + '.SAFE' #对已存在的文件夹进行检查
Check_path = os.path.join(destination_path,SplitPre_C)
#print("SplitPre",SplitPre)
Compare = [] #存放解压路径下已有的文件夹名
if len(Doned) == 0: #如果解压路径为空,很有可能会报错,因此在解压路径下随便建立一个文件夹防止报错
os.mkdir(destination_path + './nofile')
else:
for D in Doned:
Compare_name = re.split(r'[.,\s ]\s*', D)[0] #获得解压路径下已存的文件夹名
Compare.append(Compare_name)
#print("Compare_name",Compare)
if SplitPre in str(Compare):
print ("------------------------------------")
print("%s文件已存在"%SplitPre_C)
#print(os.path.join(destination_path,SplitPre))
Check_size(Check_path)
else:
# 开始解压
archive = zipfile.ZipFile(zip_file_name,mode='r')
for file in archive.namelist():
archive.extract(file, destination_path)
end = time.time()
print ("------------------------------------")
print("%s.zip文件解压完成,用时%.3fs"%(SplitPre,(end-start)))
Check_size(Check_path)
except BaseException as e: #抛出异常的处理
print(str(e))
思路与自己第一篇博客下载所需要的轨道数据相似,首先把解压文件路径下已经存在的文件夹进行统计,python会输出一个List列表(解压后文件夹命名为XXXXX.SAFE,用这个进行匹配显然是不行的,因此要对后缀前的信息进行提取),然后就是读取准备解压的文件,将其与前面输出的列表进行比对,如果其已经在列表中,那么意味着该文件已经被解压过了,检核无误后即可跳过(但是目前还没有考虑如果有错误怎么办,只能后续手动删有能力后续可以在程序结束前再添加一段代码,定位到解压错误的文件夹,将其删除并重新解压,类似哨兵轨道数据的重下载),如果没在列表中则执行解压与检核工作。
3.对所需要求进行改进后的代码代码如下(示例):
import zipfile
import os
import sys
import importlib
from tkinter import filedialog
import datetime
import time
import tkinter as tk
import re
"""
本代码实现功能:
1.批量解压哨兵数据zip
2.检查解压数据是否合规(存储占用7GB以上,包含38个子文件)
"""
#下载哨兵数据都是zip格式的
importlib.reload(sys)#如遇到无法识别中文而报错使用,实际哨兵数据都是英文加数字,一般用不到
TimeStart = datetime.datetime.now() #记录程序开始时间
# 将zip文件解压处理,并放到指定的文件夹里面去
def unzip_file(zip_file_name,destination_path):
try:
start = time.time()
Doned = os.listdir(destination_path) #已经解压过的文件
SplitPre= re.split(r'[/,.,\,\s ]\s*', zip_file_name)[-2] #将准备解压的文件名提取出
SplitPre_C = SplitPre + '.SAFE' #对已存在的文件夹进行检查
Check_path = os.path.join(destination_path,SplitPre_C)
#print("SplitPre",SplitPre)
Compare = [] #存放解压路径下已有的文件夹名
if len(Doned) == 0: #如果解压路径为空,很有可能会报错,因此在解压路径下随便建立一个文件夹防止报错
os.mkdir(destination_path + './nofile')
else:
for D in Doned:
Compare_name = re.split(r'[.,\s ]\s*', D)[0] #获得解压路径下已存的文件夹名
Compare.append(Compare_name)
#print("Compare_name",Compare)
if SplitPre in str(Compare):
print ("------------------------------------")
print("%s文件已存在"%SplitPre_C)
#print(os.path.join(destination_path,SplitPre))
Check_size(Check_path)
else:
# 开始解压
archive = zipfile.ZipFile(zip_file_name,mode='r')
for file in archive.namelist():
archive.extract(file, destination_path)
end = time.time()
print ("------------------------------------")
print("%s.zip文件解压完成,用时%.3fs"%(SplitPre,(end-start)))
Check_size(Check_path)
except BaseException as e: #抛出异常的处理
print(str(e))
def zipfile_name(file_dir):
# 读取文件夹下面的文件名.zip
try:
Zip=[]
for root, dirs, files in os.walk(file_dir):
for file in files:
if os.path.splitext(file)[1] == '.zip': # 读取带zip 文件
Zip.append(os.path.join(root, file))
else:
print ("------------------------------------")
print("此文件格式非zip格式")
break
return Zip
except BaseException as e: #抛出异常的处理
print(str(e))
def Check_size(path):
size = 0.0 # 记录大小
count = 0 # 记录数量
for root, dirs, files in os.walk(path):
size += sum([os.path.getsize(os.path.join(root, file)) for file in files])
for each in files:
count += 1
size = round(size/1024/1024/1024,2) #小数点后两位
if size >6.5 and count >= 38:
print("此文件夹大小为%sGB,文件个数为%d个,合格"%(size,count))
else:
print("此文件解压有误,大小为%sGB,文件个数为%d个"%(size,count))
#入口函数
def main():
try:
root = tk.Tk()
root.geometry("100x50")
# unzip_path = r'E:\data/new'
# data_path = r'E:\data/old'
data_path = filedialog.askdirectory(title='选择压缩文件路径') #选择压缩包的路径
print("压缩文件数据路径为:",data_path)
unzip_path = filedialog.askdirectory(title='选择解压文件路径') #选择解压路径
print("解压路径为:",unzip_path)
root.destroy() # 和mainloop()配合可以关闭tk空白窗口
root.mainloop()
# 对压缩数据路径下的压缩文件执行 *** 作
fn=zipfile_name(data_path)
for file in fn:
unzip_file(file, unzip_path)
nums = len(os.listdir(unzip_path))
print("%s个文件解压完毕"%nums)
TimeEnd = datetime.datetime.now()
print('Running time: %s ' %(TimeEnd - TimeStart))
except BaseException as e: #抛出异常的处理
print(str(e))
if __name__ == "__main__":
main()
exit()
三、代码运行情况
虽然自己感觉用处不是很大,但是能帮助有需要的同学就行,代码运行时会反馈运行情况,如图,可以知道解压后对应文件夹的大小以及文件夹内部的文件数目,还有运行的时间,万一哪里出错了可以定位到错误的数据,重新进行解压。
最后结果没问题,但是最后两个大小小于7GB显示错误,因此用电脑自带的解压软件进行解压,发现确实是6.8G,可能阈值设置太高了,应该改小点,至于运行时间,emmm不知道哪里出bug了,虽然没报错但是不正确,懒得改了~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)