2025-08-21 18:19:04 +08:00
|
|
|
|
import os
|
|
|
|
|
|
import tarfile
|
|
|
|
|
|
import shutil
|
|
|
|
|
|
import stat
|
2025-08-22 21:19:53 +08:00
|
|
|
|
import re
|
2025-08-27 09:02:49 +08:00
|
|
|
|
import gzip
|
2025-08-22 21:19:53 +08:00
|
|
|
|
from datetime import datetime
|
2025-08-27 09:02:49 +08:00
|
|
|
|
from pathlib import Path
|
2025-08-24 02:17:32 +08:00
|
|
|
|
from PyQt5.QtWidgets import QMessageBox
|
|
|
|
|
|
|
|
|
|
|
|
def show_error_message(parent, file_path):
|
|
|
|
|
|
"""显示文件格式错误提示弹窗"""
|
|
|
|
|
|
QMessageBox.critical(
|
|
|
|
|
|
parent,
|
|
|
|
|
|
"文件格式错误",
|
|
|
|
|
|
f"不支持的文件格式:\n{file_path}\n\n请上传.tar.gz格式的压缩包。"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def show_critical_message(parent, title, message):
|
|
|
|
|
|
"""显示通用错误提示弹窗"""
|
|
|
|
|
|
QMessageBox.critical(
|
|
|
|
|
|
parent,
|
|
|
|
|
|
title,
|
|
|
|
|
|
message
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def show_info_message(parent, title, message):
|
|
|
|
|
|
"""显示信息提示弹窗"""
|
|
|
|
|
|
QMessageBox.information(
|
|
|
|
|
|
parent,
|
|
|
|
|
|
title,
|
|
|
|
|
|
message
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def show_question_message(parent, title, message):
|
|
|
|
|
|
"""显示询问提示弹窗,返回用户选择(Yes/No)"""
|
|
|
|
|
|
return QMessageBox.question(
|
|
|
|
|
|
parent,
|
|
|
|
|
|
title,
|
|
|
|
|
|
message,
|
|
|
|
|
|
QMessageBox.Yes | QMessageBox.No,
|
|
|
|
|
|
QMessageBox.No
|
|
|
|
|
|
)
|
2025-08-21 18:19:04 +08:00
|
|
|
|
|
|
|
|
|
|
def get_project_root():
|
|
|
|
|
|
current_file = os.path.abspath(__file__)
|
|
|
|
|
|
project_root = os.path.dirname(os.path.dirname(current_file))
|
|
|
|
|
|
return project_root
|
|
|
|
|
|
|
|
|
|
|
|
def unzip_log(tar_path, extract_path='.'):
|
|
|
|
|
|
"""
|
|
|
|
|
|
解压 tar.gz 文件到指定目录
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
tar_path (str): tar.gz 文件的路径
|
|
|
|
|
|
extract_path (str): 解压目标目录,默认为当前目录
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 检查文件是否存在
|
|
|
|
|
|
if not os.path.exists(tar_path):
|
|
|
|
|
|
raise FileNotFoundError(f"文件不存在: {tar_path}")
|
|
|
|
|
|
|
|
|
|
|
|
# 创建解压目录(如果不存在)
|
|
|
|
|
|
os.makedirs(extract_path, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
# 打开 tar.gz 文件并解压
|
|
|
|
|
|
with tarfile.open(tar_path, "r:gz") as tar:
|
|
|
|
|
|
# 列出所有文件(可选)
|
|
|
|
|
|
print(f"解压文件列表:")
|
|
|
|
|
|
for member in tar.getmembers():
|
|
|
|
|
|
print(f"- {member.name}")
|
|
|
|
|
|
|
|
|
|
|
|
# 解压所有文件到目标目录
|
|
|
|
|
|
tar.extractall(path=extract_path)
|
|
|
|
|
|
print(f"\n成功解压到: {os.path.abspath(extract_path)}")
|
|
|
|
|
|
|
|
|
|
|
|
except tarfile.TarError as e:
|
|
|
|
|
|
print(f"tar 文件处理错误: {e}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"解压失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def remove_readonly(func, path, excinfo):
|
|
|
|
|
|
"""用于处理删除只读文件的错误回调函数"""
|
|
|
|
|
|
# 尝试修改文件权限
|
|
|
|
|
|
os.chmod(path, stat.S_IWRITE)
|
|
|
|
|
|
# 再次尝试删除
|
|
|
|
|
|
func(path)
|
|
|
|
|
|
|
|
|
|
|
|
def clean_log_data(path):
|
|
|
|
|
|
"""删除目录,处理权限问题"""
|
|
|
|
|
|
if not os.path.exists(path):
|
|
|
|
|
|
print(f"目录不存在: {path}")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 方法1: 使用onerror回调处理权限问题
|
|
|
|
|
|
shutil.rmtree(path, onerror=remove_readonly)
|
|
|
|
|
|
print(f"成功删除目录: {path}")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"删除目录时出错: {e}")
|
|
|
|
|
|
# 方法2: 先递归修改权限再删除(备选方案)
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 递归修改目录权限
|
|
|
|
|
|
for root, dirs, files in os.walk(path):
|
|
|
|
|
|
for dir in dirs:
|
|
|
|
|
|
dir_path = os.path.join(root, dir)
|
|
|
|
|
|
os.chmod(dir_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
|
|
|
|
|
|
for file in files:
|
|
|
|
|
|
file_path = os.path.join(root, file)
|
|
|
|
|
|
os.chmod(file_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
|
|
|
|
|
|
|
|
|
|
|
|
# 修改权限后再次尝试删除
|
|
|
|
|
|
shutil.rmtree(path)
|
|
|
|
|
|
print(f"通过修改权限成功删除目录: {path}")
|
|
|
|
|
|
except Exception as e2:
|
|
|
|
|
|
print(f"修改权限后仍无法删除目录: {e2}")
|
|
|
|
|
|
|
|
|
|
|
|
def read_specific_line(file_path, line_number):
|
|
|
|
|
|
"""
|
|
|
|
|
|
读取文件中指定行的内容(行号从1开始)
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
|
for current_line, content in enumerate(file, 1): # 从1开始计数
|
|
|
|
|
|
if current_line == line_number:
|
|
|
|
|
|
return content.strip() # strip() 去除换行符和空格
|
|
|
|
|
|
# 若行号超出文件总行数,返回None
|
|
|
|
|
|
return None
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
print(f"错误:文件 '{file_path}' 不存在")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def append_to_file(file_path, content):
|
|
|
|
|
|
"""
|
|
|
|
|
|
以追加模式将字符串写入文件
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
file_path (str): 文件路径
|
|
|
|
|
|
content (str): 要写入的字符串内容
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 打开文件,模式为 'a'(追加),编码指定为 utf-8 以支持中文
|
|
|
|
|
|
with open(file_path, 'a', encoding='utf-8') as file:
|
|
|
|
|
|
# 写入内容(可根据需要添加换行符 '\n')
|
|
|
|
|
|
file.write(content + '\n') # 加 '\n' 使每次写入占一行
|
|
|
|
|
|
print(f"内容已成功追加到文件:{file_path}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"写入文件失败:{str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
def read_file_to_string(file_path):
|
|
|
|
|
|
"""
|
|
|
|
|
|
打开文件并将全部内容读取到一个字符串中
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
file_path (str): 要读取的文件路径
|
|
|
|
|
|
返回:
|
|
|
|
|
|
str: 文件内容字符串;若读取失败则返回 None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 使用 with 语句打开文件(自动处理关闭)
|
|
|
|
|
|
# 'r' 表示只读模式,encoding='utf-8' 确保中文正常读取
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
|
# read() 方法读取全部内容并返回字符串
|
|
|
|
|
|
content = file.read()
|
|
|
|
|
|
return content
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
print(f"错误:文件 '{file_path}' 不存在")
|
|
|
|
|
|
except PermissionError:
|
|
|
|
|
|
print(f"错误:没有权限读取文件 '{file_path}'")
|
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
|
print(f"错误:文件 '{file_path}' 不是 UTF-8 编码,无法读取")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"读取文件失败:{str(e)}")
|
2025-08-22 21:19:53 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def merge_logrotate_files(source_path, num_files, output_path):
|
|
|
|
|
|
"""
|
|
|
|
|
|
合并由logrotate分割的文件
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
source_path (str): 原始文件路径(不包含.1, .2等后缀)
|
|
|
|
|
|
num_files (int): 要合并的文件数量(包括原始文件)
|
|
|
|
|
|
output_path (str): 合并后文件的输出路径
|
|
|
|
|
|
"""
|
|
|
|
|
|
if num_files < 1:
|
|
|
|
|
|
raise ValueError("文件数量必须至少为1")
|
|
|
|
|
|
|
|
|
|
|
|
# 构建要合并的文件列表
|
|
|
|
|
|
# 日志轮转文件通常按 .1(最新备份), .2(次新), ... 原始文件(最新)的顺序排列
|
|
|
|
|
|
files_to_merge = []
|
|
|
|
|
|
|
|
|
|
|
|
# 添加备份文件(从.1到.num_files-1)
|
|
|
|
|
|
for i in range(1, num_files):
|
|
|
|
|
|
backup_file = f"{source_path}.{i}"
|
|
|
|
|
|
if os.path.exists(backup_file):
|
|
|
|
|
|
files_to_merge.append(backup_file)
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"警告: 备份文件 {backup_file} 不存在,已跳过")
|
|
|
|
|
|
|
|
|
|
|
|
# 添加原始文件(最新的日志)
|
|
|
|
|
|
if os.path.exists(source_path):
|
|
|
|
|
|
files_to_merge.append(source_path)
|
|
|
|
|
|
else:
|
|
|
|
|
|
raise FileNotFoundError(f"原始文件 {source_path} 不存在")
|
|
|
|
|
|
|
|
|
|
|
|
# 如果找到的文件少于要求的数量,给出警告
|
|
|
|
|
|
if len(files_to_merge) < num_files:
|
|
|
|
|
|
print(f"警告: 只找到 {len(files_to_merge)} 个文件,而不是要求的 {num_files} 个")
|
|
|
|
|
|
|
|
|
|
|
|
# 合并文件
|
|
|
|
|
|
with open(output_path, 'w') as outfile:
|
|
|
|
|
|
for file_path in files_to_merge:
|
|
|
|
|
|
try:
|
|
|
|
|
|
with open(file_path, 'r') as infile:
|
|
|
|
|
|
# 读取并写入文件内容
|
|
|
|
|
|
outfile.write(infile.read())
|
|
|
|
|
|
# 在文件之间添加一个换行,避免内容粘连
|
|
|
|
|
|
outfile.write('\n')
|
|
|
|
|
|
print(f"已合并: {file_path}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"合并文件 {file_path} 时出错: {str(e)}")
|
|
|
|
|
|
|
|
|
|
|
|
print(f"所有文件已合并至: {output_path}")
|
|
|
|
|
|
|
|
|
|
|
|
def get_nth_integer_after_line(file_path, target_string, n=1):
|
|
|
|
|
|
"""
|
|
|
|
|
|
打开文件,找到包含目标字符串的行,读取下一行并提取第n个整数
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
file_path: 文件路径
|
|
|
|
|
|
target_string: 要查找的目标字符串
|
|
|
|
|
|
n: 要返回的第几个整数(从1开始计数)
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
找到的第n个整数,如果未找到则返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
if n < 1:
|
|
|
|
|
|
print("错误:n必须是大于等于1的整数")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
|
# 逐行读取文件
|
|
|
|
|
|
for line in file:
|
|
|
|
|
|
# 检查当前行是否包含目标字符串
|
|
|
|
|
|
if target_string in line:
|
|
|
|
|
|
# 读取下一行
|
|
|
|
|
|
next_line = next(file, None)
|
|
|
|
|
|
if next_line is None:
|
|
|
|
|
|
print("目标字符串所在行为文件最后一行,没有下一行")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 处理下一行,提取所有整数
|
|
|
|
|
|
integers = []
|
|
|
|
|
|
# 分割成单词,尝试转换为整数
|
|
|
|
|
|
words = next_line.strip().split()
|
|
|
|
|
|
for word in words:
|
|
|
|
|
|
# 清理单词,保留数字和负号
|
|
|
|
|
|
cleaned_word = ''.join(filter(lambda c: c.isdigit() or c == '-', word))
|
|
|
|
|
|
if cleaned_word: # 确保清理后不为空
|
|
|
|
|
|
try:
|
|
|
|
|
|
num = int(cleaned_word)
|
|
|
|
|
|
integers.append(num)
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 检查是否有足够的整数
|
|
|
|
|
|
if len(integers) >= n:
|
|
|
|
|
|
return integers[n-1] # 因为列表是0索引,所以n-1
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f"下一行中只找到 {len(integers)} 个整数,无法返回第 {n} 个")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 如果遍历完文件都没找到目标字符串
|
|
|
|
|
|
print(f"文件中未找到包含 '{target_string}' 的行")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
print(f"错误:文件 '{file_path}' 不存在")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"处理文件时发生错误:{str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def extract_first_second_level_timestamp(text):
|
|
|
|
|
|
"""
|
|
|
|
|
|
提取字符串中第一个不带时区的秒级时间戳(格式:YYYY-MM-DDTHH:MM:SS)
|
|
|
|
|
|
并返回标准ISO格式字符串
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
text (str): 包含时间戳的原始字符串
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
str: 提取到的ISO格式时间戳,未找到则返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 正则表达式匹配不带时区的秒级时间戳(YYYY-MM-DDTHH:MM:SS)
|
|
|
|
|
|
# 严格匹配日期和时间的数字范围(如月份1-12,日期1-31等)
|
|
|
|
|
|
timestamp_pattern = (
|
|
|
|
|
|
r'\b\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])' # 日期部分 YYYY-MM-DD
|
|
|
|
|
|
r'T([01]\d|2[0-3]):([0-5]\d):([0-5]\d)\b' # 时间部分 THH:MM:SS
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# 查找第一个匹配的时间戳
|
|
|
|
|
|
match = re.search(timestamp_pattern, text)
|
|
|
|
|
|
|
|
|
|
|
|
if match:
|
|
|
|
|
|
timestamp_str = match.group()
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 解析为datetime对象(不带时区)
|
|
|
|
|
|
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
|
|
|
|
|
|
# 返回ISO格式(秒级)
|
|
|
|
|
|
return dt.isoformat()
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
# 理论上正则已过滤无效格式,此处作为兜底
|
|
|
|
|
|
return timestamp_str
|
|
|
|
|
|
|
|
|
|
|
|
# 未找到匹配的时间戳
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def is_full_word_present(text, word):
|
|
|
|
|
|
"""
|
|
|
|
|
|
判断text中是否包含完整的word(全词匹配,大小写敏感)
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
text (str): 待检查的字符串
|
|
|
|
|
|
word (str): 要匹配的完整单词
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
bool: 存在全词匹配返回True,否则返回False
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 使用正则表达式元字符定义单词边界,确保全词匹配
|
|
|
|
|
|
# re.escape()用于转义word中的特殊字符
|
|
|
|
|
|
pattern = r'\b' + re.escape(word) + r'\b'
|
|
|
|
|
|
|
|
|
|
|
|
# 搜索匹配(大小写敏感)
|
|
|
|
|
|
match = re.search(pattern, text)
|
|
|
|
|
|
|
|
|
|
|
|
return bool(match)
|
|
|
|
|
|
|
|
|
|
|
|
def parse_idllog_line(log_line):
|
|
|
|
|
|
"""
|
|
|
|
|
|
解析特定格式的日志行,提取关键信息
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
log_line (str): 要解析的日志行字符串
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
dict: 包含提取的信息的字典,若解析失败则返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 处理开头的 <162> 部分,先移除这部分内容
|
|
|
|
|
|
# 找到第一个空格,跳过 <162> 部分
|
|
|
|
|
|
first_space_index = log_line.find(' ')
|
|
|
|
|
|
if first_space_index == -1:
|
|
|
|
|
|
return None
|
|
|
|
|
|
content_after_prefix = log_line[first_space_index:].strip()
|
|
|
|
|
|
# 从剩余内容中提取第一个时间戳(到下一个空格)
|
|
|
|
|
|
timestamp_end = content_after_prefix.find(' ')
|
|
|
|
|
|
if timestamp_end == -1:
|
|
|
|
|
|
return None
|
|
|
|
|
|
timestamp = content_after_prefix[:timestamp_end].strip()
|
|
|
|
|
|
|
|
|
|
|
|
# 查找包含|分隔符的部分
|
|
|
|
|
|
pipe_start = log_line.find('|')
|
|
|
|
|
|
if pipe_start == -1:
|
|
|
|
|
|
return None
|
|
|
|
|
|
pipe_content = log_line[pipe_start:]
|
|
|
|
|
|
|
|
|
|
|
|
# 按|分割内容
|
|
|
|
|
|
parts = [part.strip() for part in pipe_content.split('|') if part.strip()]
|
|
|
|
|
|
|
|
|
|
|
|
# 检查是否有足够的部分
|
|
|
|
|
|
if len(parts) < 5:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 提取各部分信息
|
|
|
|
|
|
component_type = parts[1] # 部件类型
|
|
|
|
|
|
event_type = parts[2] # 事件类型
|
|
|
|
|
|
event_level = parts[3] # 事件等级
|
|
|
|
|
|
event_code = parts[4] # 事件代码
|
|
|
|
|
|
|
|
|
|
|
|
# 事件描述是剩余部分的组合
|
|
|
|
|
|
event_description = '|'.join(parts[5:]) if len(parts) > 5 else ""
|
|
|
|
|
|
|
|
|
|
|
|
# 提取第一个单词(以空格为分隔符)
|
|
|
|
|
|
sensor = event_description.split()[0]
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'timestamp': timestamp,
|
|
|
|
|
|
'component_type': component_type,
|
|
|
|
|
|
'event_type': event_type,
|
|
|
|
|
|
'event_level': event_level,
|
|
|
|
|
|
'event_code': event_code,
|
|
|
|
|
|
'sensor' : sensor,
|
|
|
|
|
|
'description': event_description
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"解析日志行时出错: {str(e)}")
|
2025-08-27 09:02:49 +08:00
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def extract_maintenancelog_gz_files(input_dir, output_dir, max_files=10):
|
|
|
|
|
|
"""
|
|
|
|
|
|
解压指定目录下的maintenancelog.1.gz到log.max_files.gz文件到目标目录
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
input_dir: 压缩文件所在的目录路径
|
|
|
|
|
|
output_dir: 解压后文件的保存目录路径
|
|
|
|
|
|
max_files: 最大文件编号,默认为10
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 确保输入输出目录存在
|
|
|
|
|
|
input_path = Path(input_dir)
|
|
|
|
|
|
output_path = Path(output_dir)
|
|
|
|
|
|
|
|
|
|
|
|
# 创建输出目录(如果不存在)
|
|
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
# 检查输入目录是否存在
|
|
|
|
|
|
if not input_path.exists() or not input_path.is_dir():
|
|
|
|
|
|
print(f"错误:输入目录 '{input_path}' 不存在或不是一个目录")
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(1, max_files + 1):
|
|
|
|
|
|
# 压缩文件路径
|
|
|
|
|
|
gz_filename = input_path / f"maintenance.log.{i}.gz"
|
|
|
|
|
|
|
|
|
|
|
|
# 检查文件是否存在
|
|
|
|
|
|
if not gz_filename.exists() or not gz_filename.is_file():
|
|
|
|
|
|
print(f"文件 {gz_filename} 不存在,跳过")
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
# 解压后的文件路径
|
|
|
|
|
|
output_filename = output_path / f"maintenance.log.{i}"
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 打开压缩文件并解压
|
|
|
|
|
|
with gzip.open(gz_filename, 'rb') as f_in:
|
|
|
|
|
|
with open(output_filename, 'wb') as f_out:
|
|
|
|
|
|
# 分块读取写入,处理大文件更高效
|
|
|
|
|
|
while True:
|
|
|
|
|
|
chunk = f_in.read(1024 * 1024) # 1MB块
|
|
|
|
|
|
if not chunk:
|
|
|
|
|
|
break
|
|
|
|
|
|
f_out.write(chunk)
|
|
|
|
|
|
|
|
|
|
|
|
print(f"成功解压: {gz_filename} -> {output_filename}")
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"解压 {gz_filename} 时出错: {str(e)}")
|