使用ArcPy自动化批量裁剪遥感影像数据
2024.08.20
最近不得已接手了一个任务——裁切原始数据,其实任务很简单,如果数据量小的话,其实用个熟悉的ArcGIS简单弄下就好。但这次甲方直接扔了刚刚从卫星网站上下载好多个研究区一整年的数据,还是以档案包的形式给我,然后让我裁切好返回,涉及到的数据量大概有1.5T左右,不得不使用代码自动处理了。
工作流程
考虑到硬盘容量有限,因此代码会遍历原始数据目录后,再逐个对数据进行解压、裁切。代码的大致工作流程如下:
- 建立临时数据处理目录
- 从input目录读取一份原始数据档案包,将其解压至不再是档案包
- 读取其中的 “tif”, “tiff” 文件
- 以研究区为参考依次对数据裁切(可能一个档案包存有多景数据),保存至output目录
- 删除临时数据处理目录
- 执行下一个数据的裁切
裁切后的数据将和研究区的坐标系保持一致,在命名上,命名则是保持和档案包一致,如果一个档案包有多个数据,则在文件加上 “_{Figure}” 的后缀。如档案包的名字为 “20240101_ABC.tar.gz” ,则输出数据命名为 “20240101_ABC.tif”, “20240101_ABC_2.tif”…
如果遇到档案包内没有数据、数据不在裁切区内之类的问题,代码执行会给予warning并跳过,不会影响代码的执行。
事前准备
Python库
数据处理是使用Python执行,由于我对ArcGIS比较熟悉,所以这次直接使用了ArcPy库的 “Extract by Mask” 来处理数据。除此之外,由于涉及到压缩包和目录的处理,还使用到了 shutil, tarfile, py7zr, zipfile, rarfile 之类的库。
工作目录
需要设置一个工作目录,该工作目录下需要有预先准备好四个文件夹,名字可以自己定义。
- /Input/ : 用于存放原始数据
- /Output/ : 用于输出裁切后数据
- /Processing/ : 处理过程数据的临时目录
- /StudyArea/ : 用于存放研究区shpfile
输入数据
应当准备的数据有以下两样:原始遥感数据档案(支持 tar.gz, 7z, zip, rar 格式)、裁切的参考研究区( 应使用shp格式 )。
代码
用于裁切的主代码如下。
import arcpy
import os
import tarfile
import shutil
import zipfile
import rarfile
import py7zr
import time
# 设置工作目录
workspace = r"D:\WorkDirectory" # 工作目录
input_folder = os.path.join(workspace, "Input") # 原始数据目录
processing_folder = os.path.join(workspace, "Processing") # 临时目录
output_folder = os.path.join(workspace, "Output") # 输出目录
study_area_folder = os.path.join(workspace, "StudyArea") # 研究区目录
study_area_shp = os.path.join(study_area_folder, "StudyArea.shp") # 研究区的shp文件
# 创建必要的文件夹
if not os.path.exists(processing_folder):
os.makedirs(processing_folder)
# 定义解压函数
def extract_file(file_path, dest_folder):
try:
if file_path.endswith('.tar.gz'):
with tarfile.open(file_path, 'r:gz') as tar:
tar.extractall(dest_folder)
elif file_path.endswith('.7z'):
with py7zr.SevenZipFile(file_path, mode='r') as archive:
archive.extractall(dest_folder)
elif file_path.endswith('.zip'):
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(dest_folder)
elif file_path.endswith('.rar'):
with rarfile.RarFile(file_path) as rar:
rar.extractall(dest_folder)
else:
print(f"Unsupported file format: {file_path}")
return False
return True
except Exception as e:
print(f"Error extracting {file_path}: {str(e)}")
return False
# 定义按掩膜提取功能的函数
def clip_raster_by_mask(raster, mask, output_path):
try:
arcpy.env.workspace = processing_folder
arcpy.env.overwriteOutput = True
arcpy.management.Clip(raster, "#", output_path, mask, "#", "ClippingGeometry", "NO_MAINTAIN_EXTENT")
return True
except Exception as e:
print(f"Error clipping {raster} by {mask}: {str(e)}")
return False
# 强制删除文件的方法
def force_delete(file_path):
try:
os.remove(file_path)
except Exception as e:
print(f"Error deleting file {file_path}: {str(e)}")
# 检查数据包是否已经处理过
def is_package_processed(file_name):
output_name = file_name.rsplit('.', 1)[0]
processed_files = [f for f in os.listdir(output_folder) if f.startswith(output_name)]
return len(processed_files) > 0
# 主处理流程
def process_data():
for file_name in os.listdir(input_folder):
file_path = os.path.join(input_folder, file_name)
# 跳过已处理过的数据包
if is_package_processed(file_name):
print(f"Data package {file_name} is already processed, skipping.")
continue
# 处理.tiff文件
if file_name.lower().endswith(('.tiff', '.tif')):
print(f"Processing TIFF file: {file_name}")
output_name = file_name.rsplit('.', 1)[0]
output_raster = os.path.join(output_folder, output_name + ".tif")
if clip_raster_by_mask(file_path, study_area_shp, output_raster):
print(f"Successfully processed {output_raster}")
else:
print(f"Failed to process {file_path}, skipping to next file.")
continue
# 检查是否是压缩包
if not any(file_name.lower().endswith(ext) for ext in ['.tar.gz', '.7z', '.zip', '.rar']):
continue
print(f"Processing data package: {file_name}")
# 清空processing文件夹
shutil.rmtree(processing_folder)
os.makedirs(processing_folder)
# 解压文件
if not extract_file(file_path, processing_folder):
print(f"Failed to extract {file_name}, skipping to next package.")
continue
# 处理解压出来的影像
raster_count = 0
for root, dirs, files in os.walk(processing_folder):
for file in files:
if file.lower().endswith(('.tif', '.tiff')):
raster_count += 1
raster_path = os.path.join(root, file)
output_name = file_name.rsplit('.', 1)[0]
if raster_count > 1:
output_name += f"_{raster_count}"
output_raster = os.path.join(output_folder, output_name + ".tif")
if clip_raster_by_mask(raster_path, study_area_shp, output_raster):
print(f"Successfully processed {output_raster}")
else:
print(f"Failed to process {raster_path}, skipping to next raster.")
# 延时等待文件句柄释放
time.sleep(2)
# 尝试强制删除正在使用的文件
for root, dirs, files in os.walk(processing_folder):
for file in files:
force_delete(os.path.join(root, file))
# 删除临时文件夹
shutil.rmtree(processing_folder)
os.makedirs(processing_folder)
print(f"Completed processing of {file_name}")
# 执行主流程
process_data()
由于甲方还要求告知哪些日期的数据最后没被裁切出来,所以按照之前的特性出了个代码,先提取Input和Output目录下的数据内的日期,然后比较一下Output目录下哪些日期是缺失的。
import os
import re
# 定义目录路径
input_dir = r"D:\WorkDirectory\Input"
output_dir = r"D:\WorkDirectory\Output"
# 用于存储提取的日期
input_dates = set()
output_dates = set()
# 提取文件名中的日期
date_pattern = re.compile(r"(\d{8})")
# 获取Input目录中的日期
for filename in os.listdir(input_dir):
match = date_pattern.search(filename)
if match:
input_dates.add(match.group(1))
# 获取Output目录中的日期
for filename in os.listdir(output_dir):
match = date_pattern.search(filename)
if match:
output_dates.add(match.group(1))
# 找出Input有而Output没有的日期
missing_dates = input_dates - output_dates
# 输出结果
if missing_dates:
print("以下日期在Input目录中有,但在Output目录中没有:")
for date in sorted(missing_dates):
print(date)
else:
print("所有日期在Output目录中都已匹配。")
发表回复