onekeydiag/src/utils.py

136 lines
5.0 KiB
Python
Raw Normal View History

2025-08-21 18:19:04 +08:00
import os
import tarfile
import shutil
import stat
def get_project_root():
current_file = os.path.abspath(__file__)
project_root = os.path.dirname(os.path.dirname(current_file))
return project_root
def unzip_log(tar_path, extract_path='.'):
"""
解压 tar.gz 文件到指定目录
参数:
tar_path (str): tar.gz 文件的路径
extract_path (str): 解压目标目录默认为当前目录
"""
try:
# 检查文件是否存在
if not os.path.exists(tar_path):
raise FileNotFoundError(f"文件不存在: {tar_path}")
# 创建解压目录(如果不存在)
os.makedirs(extract_path, exist_ok=True)
# 打开 tar.gz 文件并解压
with tarfile.open(tar_path, "r:gz") as tar:
# 列出所有文件(可选)
print(f"解压文件列表:")
for member in tar.getmembers():
print(f"- {member.name}")
# 解压所有文件到目标目录
tar.extractall(path=extract_path)
print(f"\n成功解压到: {os.path.abspath(extract_path)}")
except tarfile.TarError as e:
print(f"tar 文件处理错误: {e}")
except Exception as e:
print(f"解压失败: {e}")
def remove_readonly(func, path, excinfo):
"""用于处理删除只读文件的错误回调函数"""
# 尝试修改文件权限
os.chmod(path, stat.S_IWRITE)
# 再次尝试删除
func(path)
def clean_log_data(path):
"""删除目录,处理权限问题"""
if not os.path.exists(path):
print(f"目录不存在: {path}")
return
try:
# 方法1: 使用onerror回调处理权限问题
shutil.rmtree(path, onerror=remove_readonly)
print(f"成功删除目录: {path}")
except Exception as e:
print(f"删除目录时出错: {e}")
# 方法2: 先递归修改权限再删除(备选方案)
try:
# 递归修改目录权限
for root, dirs, files in os.walk(path):
for dir in dirs:
dir_path = os.path.join(root, dir)
os.chmod(dir_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
for file in files:
file_path = os.path.join(root, file)
os.chmod(file_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
# 修改权限后再次尝试删除
shutil.rmtree(path)
print(f"通过修改权限成功删除目录: {path}")
except Exception as e2:
print(f"修改权限后仍无法删除目录: {e2}")
def read_specific_line(file_path, line_number):
"""
读取文件中指定行的内容行号从1开始
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
for current_line, content in enumerate(file, 1): # 从1开始计数
if current_line == line_number:
return content.strip() # strip() 去除换行符和空格
# 若行号超出文件总行数返回None
return None
except FileNotFoundError:
print(f"错误:文件 '{file_path}' 不存在")
return None
def append_to_file(file_path, content):
"""
以追加模式将字符串写入文件
参数:
file_path (str): 文件路径
content (str): 要写入的字符串内容
"""
try:
# 打开文件,模式为 'a'(追加),编码指定为 utf-8 以支持中文
with open(file_path, 'a', encoding='utf-8') as file:
# 写入内容(可根据需要添加换行符 '\n'
file.write(content + '\n') # 加 '\n' 使每次写入占一行
print(f"内容已成功追加到文件:{file_path}")
except Exception as e:
print(f"写入文件失败:{str(e)}")
def read_file_to_string(file_path):
"""
打开文件并将全部内容读取到一个字符串中
参数:
file_path (str): 要读取的文件路径
返回:
str: 文件内容字符串若读取失败则返回 None
"""
try:
# 使用 with 语句打开文件(自动处理关闭)
# 'r' 表示只读模式encoding='utf-8' 确保中文正常读取
with open(file_path, 'r', encoding='utf-8') as file:
# read() 方法读取全部内容并返回字符串
content = file.read()
return content
except FileNotFoundError:
print(f"错误:文件 '{file_path}' 不存在")
except PermissionError:
print(f"错误:没有权限读取文件 '{file_path}'")
except UnicodeDecodeError:
print(f"错误:文件 '{file_path}' 不是 UTF-8 编码,无法读取")
except Exception as e:
print(f"读取文件失败:{str(e)}")
return None