onekeydiag/src/utils.py
2025-08-27 14:42:12 +08:00

455 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import tarfile
import shutil
import stat
import re
import gzip
from datetime import datetime
from pathlib import Path
from PyQt5.QtWidgets import QMessageBox
def show_error_message(parent, file_path):
"""显示文件格式错误提示弹窗"""
QMessageBox.critical(
parent,
"文件格式错误",
f"不支持的文件格式:\n{file_path}\n\n请上传.tar.gz格式的压缩包。"
)
def show_critical_message(parent, title, message):
"""显示通用错误提示弹窗"""
QMessageBox.critical(
parent,
title,
message
)
def show_info_message(parent, title, message):
"""显示信息提示弹窗"""
QMessageBox.information(
parent,
title,
message
)
def show_question_message(parent, title, message):
"""显示询问提示弹窗返回用户选择Yes/No"""
return QMessageBox.question(
parent,
title,
message,
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
def get_app_cache_root():
current_file = os.path.abspath(__file__)
app_cache_root = os.path.dirname(os.path.dirname(current_file))
return app_cache_root
def get_project_root():
app_cache_root = get_app_cache_root()
return app_cache_root
def unzip_log(tar_path, extract_path='.'):
"""
解压 tar.gz 文件到指定目录
参数:
tar_path (str): tar.gz 文件的路径
extract_path (str): 解压目标目录,默认为当前目录
"""
try:
# 检查文件是否存在
if not os.path.exists(tar_path):
raise FileNotFoundError(f"文件不存在: {tar_path}")
# 创建解压目录(如果不存在)
os.makedirs(extract_path, exist_ok=True)
# 打开 tar.gz 文件并解压
with tarfile.open(tar_path, "r:gz") as tar:
# 列出所有文件(可选)
print(f"解压文件列表:")
for member in tar.getmembers():
print(f"- {member.name}")
# 解压所有文件到目标目录
tar.extractall(path=extract_path)
print(f"\n成功解压到: {os.path.abspath(extract_path)}")
except tarfile.TarError as e:
print(f"tar 文件处理错误: {e}")
except Exception as e:
print(f"解压失败: {e}")
def remove_readonly(func, path, excinfo):
"""用于处理删除只读文件的错误回调函数"""
# 尝试修改文件权限
os.chmod(path, stat.S_IWRITE)
# 再次尝试删除
func(path)
def clean_log_data(path):
"""删除目录,处理权限问题"""
if not os.path.exists(path):
print(f"目录不存在: {path}")
return
try:
# 方法1: 使用onerror回调处理权限问题
shutil.rmtree(path, onerror=remove_readonly)
print(f"成功删除目录: {path}")
except Exception as e:
print(f"删除目录时出错: {e}")
# 方法2: 先递归修改权限再删除(备选方案)
try:
# 递归修改目录权限
for root, dirs, files in os.walk(path):
for dir in dirs:
dir_path = os.path.join(root, dir)
os.chmod(dir_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
for file in files:
file_path = os.path.join(root, file)
os.chmod(file_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
# 修改权限后再次尝试删除
shutil.rmtree(path)
print(f"通过修改权限成功删除目录: {path}")
except Exception as e2:
print(f"修改权限后仍无法删除目录: {e2}")
def read_specific_line(file_path, line_number):
"""
读取文件中指定行的内容行号从1开始
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
for current_line, content in enumerate(file, 1): # 从1开始计数
if current_line == line_number:
return content.strip() # strip() 去除换行符和空格
# 若行号超出文件总行数返回None
return None
except FileNotFoundError:
print(f"错误:文件 '{file_path}' 不存在")
return None
def append_to_file(file_path, content):
"""
以追加模式将字符串写入文件
参数:
file_path (str): 文件路径
content (str): 要写入的字符串内容
"""
try:
# 打开文件,模式为 'a'(追加),编码指定为 utf-8 以支持中文
with open(file_path, 'a', encoding='utf-8') as file:
# 写入内容(可根据需要添加换行符 '\n'
file.write(content + '\n') # 加 '\n' 使每次写入占一行
print(f"内容已成功追加到文件:{file_path}")
except Exception as e:
print(f"写入文件失败:{str(e)}")
def read_file_to_string(file_path):
"""
打开文件并将全部内容读取到一个字符串中
参数:
file_path (str): 要读取的文件路径
返回:
str: 文件内容字符串;若读取失败则返回 None
"""
try:
# 使用 with 语句打开文件(自动处理关闭)
# 'r' 表示只读模式encoding='utf-8' 确保中文正常读取
with open(file_path, 'r', encoding='utf-8') as file:
# read() 方法读取全部内容并返回字符串
content = file.read()
return content
except FileNotFoundError:
print(f"错误:文件 '{file_path}' 不存在")
except PermissionError:
print(f"错误:没有权限读取文件 '{file_path}'")
except UnicodeDecodeError:
print(f"错误:文件 '{file_path}' 不是 UTF-8 编码,无法读取")
except Exception as e:
print(f"读取文件失败:{str(e)}")
return None
def merge_logrotate_files(source_path, num_files, output_path):
"""
合并由logrotate分割的文件
参数:
source_path (str): 原始文件路径(不包含.1, .2等后缀)
num_files (int): 要合并的文件数量(包括原始文件)
output_path (str): 合并后文件的输出路径
"""
if num_files < 1:
raise ValueError("文件数量必须至少为1")
# 构建要合并的文件列表
# 日志轮转文件通常按 .1(最新备份), .2(次新), ... 原始文件(最新)的顺序排列
files_to_merge = []
# 添加备份文件(从.1到.num_files-1
for i in range(1, num_files):
backup_file = f"{source_path}.{i}"
if os.path.exists(backup_file):
files_to_merge.append(backup_file)
else:
print(f"警告: 备份文件 {backup_file} 不存在,已跳过")
# 添加原始文件(最新的日志)
if os.path.exists(source_path):
files_to_merge.append(source_path)
else:
raise FileNotFoundError(f"原始文件 {source_path} 不存在")
# 如果找到的文件少于要求的数量,给出警告
if len(files_to_merge) < num_files:
print(f"警告: 只找到 {len(files_to_merge)} 个文件,而不是要求的 {num_files}")
# 合并文件
with open(output_path, 'w') as outfile:
for file_path in files_to_merge:
try:
with open(file_path, 'r') as infile:
# 读取并写入文件内容
outfile.write(infile.read())
# 在文件之间添加一个换行,避免内容粘连
outfile.write('\n')
print(f"已合并: {file_path}")
except Exception as e:
print(f"合并文件 {file_path} 时出错: {str(e)}")
print(f"所有文件已合并至: {output_path}")
def get_nth_integer_after_line(file_path, target_string, n=1):
"""
打开文件找到包含目标字符串的行读取下一行并提取第n个整数
参数:
file_path: 文件路径
target_string: 要查找的目标字符串
n: 要返回的第几个整数从1开始计数
返回:
找到的第n个整数如果未找到则返回None
"""
if n < 1:
print("错误n必须是大于等于1的整数")
return None
try:
with open(file_path, 'r', encoding='utf-8') as file:
# 逐行读取文件
for line in file:
# 检查当前行是否包含目标字符串
if target_string in line:
# 读取下一行
next_line = next(file, None)
if next_line is None:
print("目标字符串所在行为文件最后一行,没有下一行")
return None
# 处理下一行,提取所有整数
integers = []
# 分割成单词,尝试转换为整数
words = next_line.strip().split()
for word in words:
# 清理单词,保留数字和负号
cleaned_word = ''.join(filter(lambda c: c.isdigit() or c == '-', word))
if cleaned_word: # 确保清理后不为空
try:
num = int(cleaned_word)
integers.append(num)
except ValueError:
continue
# 检查是否有足够的整数
if len(integers) >= n:
return integers[n-1] # 因为列表是0索引所以n-1
else:
print(f"下一行中只找到 {len(integers)} 个整数,无法返回第 {n}")
return None
# 如果遍历完文件都没找到目标字符串
print(f"文件中未找到包含 '{target_string}' 的行")
return None
except FileNotFoundError:
print(f"错误:文件 '{file_path}' 不存在")
return None
except Exception as e:
print(f"处理文件时发生错误:{str(e)}")
return None
def extract_first_second_level_timestamp(text):
"""
提取字符串中第一个不带时区的秒级时间戳格式YYYY-MM-DDTHH:MM:SS
并返回标准ISO格式字符串
参数:
text (str): 包含时间戳的原始字符串
返回:
str: 提取到的ISO格式时间戳未找到则返回None
"""
# 正则表达式匹配不带时区的秒级时间戳YYYY-MM-DDTHH:MM:SS
# 严格匹配日期和时间的数字范围如月份1-12日期1-31等
timestamp_pattern = (
r'\b\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])' # 日期部分 YYYY-MM-DD
r'T([01]\d|2[0-3]):([0-5]\d):([0-5]\d)\b' # 时间部分 THH:MM:SS
)
# 查找第一个匹配的时间戳
match = re.search(timestamp_pattern, text)
if match:
timestamp_str = match.group()
try:
# 解析为datetime对象不带时区
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
# 返回ISO格式秒级
return dt.isoformat()
except ValueError:
# 理论上正则已过滤无效格式,此处作为兜底
return timestamp_str
# 未找到匹配的时间戳
return None
def is_full_word_present(text, word):
"""
判断text中是否包含完整的word全词匹配大小写敏感
参数:
text (str): 待检查的字符串
word (str): 要匹配的完整单词
返回:
bool: 存在全词匹配返回True否则返回False
"""
# 使用正则表达式元字符定义单词边界,确保全词匹配
# re.escape()用于转义word中的特殊字符
pattern = r'\b' + re.escape(word) + r'\b'
# 搜索匹配(大小写敏感)
match = re.search(pattern, text)
return bool(match)
def parse_idllog_line(log_line):
"""
解析特定格式的日志行,提取关键信息
参数:
log_line (str): 要解析的日志行字符串
返回:
dict: 包含提取的信息的字典若解析失败则返回None
"""
try:
# 处理开头的 <162> 部分,先移除这部分内容
# 找到第一个空格,跳过 <162> 部分
first_space_index = log_line.find(' ')
if first_space_index == -1:
return None
content_after_prefix = log_line[first_space_index:].strip()
# 从剩余内容中提取第一个时间戳(到下一个空格)
timestamp_end = content_after_prefix.find(' ')
if timestamp_end == -1:
return None
timestamp = content_after_prefix[:timestamp_end].strip()
# 查找包含|分隔符的部分
pipe_start = log_line.find('|')
if pipe_start == -1:
return None
pipe_content = log_line[pipe_start:]
# 按|分割内容
parts = [part.strip() for part in pipe_content.split('|') if part.strip()]
# 检查是否有足够的部分
if len(parts) < 5:
return None
# 提取各部分信息
component_type = parts[1] # 部件类型
event_type = parts[2] # 事件类型
event_level = parts[3] # 事件等级
event_code = parts[4] # 事件代码
# 事件描述是剩余部分的组合
event_description = '|'.join(parts[5:]) if len(parts) > 5 else ""
# 提取第一个单词(以空格为分隔符)
sensor = event_description.split()[0]
return {
'timestamp': timestamp,
'component_type': component_type,
'event_type': event_type,
'event_level': event_level,
'event_code': event_code,
'sensor' : sensor,
'description': event_description
}
except Exception as e:
print(f"解析日志行时出错: {str(e)}")
return None
def extract_maintenancelog_gz_files(input_dir, output_dir, max_files=10):
"""
解压指定目录下的maintenancelog.1.gz到log.max_files.gz文件到目标目录
参数:
input_dir: 压缩文件所在的目录路径
output_dir: 解压后文件的保存目录路径
max_files: 最大文件编号默认为10
"""
# 确保输入输出目录存在
input_path = Path(input_dir)
output_path = Path(output_dir)
# 创建输出目录(如果不存在)
output_path.mkdir(parents=True, exist_ok=True)
# 检查输入目录是否存在
if not input_path.exists() or not input_path.is_dir():
print(f"错误:输入目录 '{input_path}' 不存在或不是一个目录")
return
for i in range(1, max_files + 1):
# 压缩文件路径
gz_filename = input_path / f"maintenance.log.{i}.gz"
# 检查文件是否存在
if not gz_filename.exists() or not gz_filename.is_file():
print(f"文件 {gz_filename} 不存在,跳过")
continue
# 解压后的文件路径
output_filename = output_path / f"maintenance.log.{i}"
try:
# 打开压缩文件并解压
with gzip.open(gz_filename, 'rb') as f_in:
with open(output_filename, 'wb') as f_out:
# 分块读取写入,处理大文件更高效
while True:
chunk = f_in.read(1024 * 1024) # 1MB块
if not chunk:
break
f_out.write(chunk)
print(f"成功解压: {gz_filename} -> {output_filename}")
except Exception as e:
print(f"解压 {gz_filename} 时出错: {str(e)}")