code sync

This commit is contained in:
leimingsheng 2025-08-22 21:19:53 +08:00
parent 31a6a0b255
commit 63d36548aa
12 changed files with 1270 additions and 42 deletions

6
.gitignore vendored

@ -1,3 +1,7 @@
tmp/
okd_tmp/
src/__pycache__/
log.tar.gz
*.tar.gz
build/
dist/
test.py

47
oneketdiag.spec Normal file

@ -0,0 +1,47 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
# 定义所有需要打包的文件和依赖
a = Analysis(
['./src/main.py'], # 程序入口文件
pathex=['./'], # 工程根目录(确保能找到子模块)
binaries=[],
# 配置资源文件(子模块会自动识别,主要配置非.py文件
datas=[
# 若有其他资源(如.ui文件、图片等按此格式添加
],
# 隐藏依赖(若打包后提示缺少模块,添加在这里)
hiddenimports=[
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[], # 排除不需要的模块(减小体积)
noarchive=False,
cipher=block_cipher,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
name='OneKeyDiag', # 生成的EXE文件名
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True, # 启用压缩(推荐)
upx_exclude=[],
runtime_tmpdir=None,
console=False, # 隐藏控制台GUI程序
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

@ -23,6 +23,9 @@
<property name="currentIndex">
<number>0</number>
</property>
<property name="tabBarAutoHide">
<bool>false</bool>
</property>
<widget class="QWidget" name="tab_console">
<attribute name="title">
<string>控制台</string>
@ -35,7 +38,8 @@
&lt;html&gt;&lt;head&gt;&lt;meta name=&quot;qrichtext&quot; content=&quot;1&quot; /&gt;&lt;style type=&quot;text/css&quot;&gt;
p, li { white-space: pre-wrap; }
&lt;/style&gt;&lt;/head&gt;&lt;body style=&quot; font-family:'SimSun'; font-size:9pt; font-weight:400; font-style:normal;&quot;&gt;
&lt;p style=&quot; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;&quot;&gt;通过文件上传一键日志压缩包来开始&lt;/p&gt;&lt;/body&gt;&lt;/html&gt;</string>
&lt;p style=&quot; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;&quot;&gt;&lt;span style=&quot; font-size:18pt; font-weight:600;&quot;&gt;通过文件上传一键日志压缩包来开始&lt;/span&gt;&lt;/p&gt;
&lt;p style=&quot; margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;&quot;&gt;&lt;span style=&quot; font-size:18pt; font-weight:600;&quot;&gt;或者将日志压缩包拖入程序&lt;/span&gt;&lt;/p&gt;&lt;/body&gt;&lt;/html&gt;</string>
</property>
</widget>
</item>
@ -55,6 +59,18 @@ p, li { white-space: pre-wrap; }
<attribute name="title">
<string>关键告警</string>
</attribute>
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QTableView" name="tableView_alert">
<property name="enabled">
<bool>true</bool>
</property>
<property name="editTriggers">
<set>QAbstractItemView::NoEditTriggers</set>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_temp_all">
<attribute name="title">
@ -128,6 +144,11 @@ p, li { white-space: pre-wrap; }
</item>
</layout>
</widget>
<widget class="QWidget" name="tab_timeline_event">
<attribute name="title">
<string>事件时间线</string>
</attribute>
</widget>
</widget>
</item>
</layout>

@ -21,6 +21,7 @@ class Ui_MainWindow(object):
self.horizontalLayout_2 = QtWidgets.QHBoxLayout(self.centralwidget)
self.horizontalLayout_2.setObjectName("horizontalLayout_2")
self.tabWidget = QtWidgets.QTabWidget(self.centralwidget)
self.tabWidget.setTabBarAutoHide(False)
self.tabWidget.setObjectName("tabWidget")
self.tab_console = QtWidgets.QWidget()
self.tab_console.setObjectName("tab_console")
@ -40,6 +41,13 @@ class Ui_MainWindow(object):
self.tabWidget.addTab(self.tab_baseinfo, "")
self.tab_alert = QtWidgets.QWidget()
self.tab_alert.setObjectName("tab_alert")
self.horizontalLayout = QtWidgets.QHBoxLayout(self.tab_alert)
self.horizontalLayout.setObjectName("horizontalLayout")
self.tableView_alert = QtWidgets.QTableView(self.tab_alert)
self.tableView_alert.setEnabled(True)
self.tableView_alert.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
self.tableView_alert.setObjectName("tableView_alert")
self.horizontalLayout.addWidget(self.tableView_alert)
self.tabWidget.addTab(self.tab_alert, "")
self.tab_temp_all = QtWidgets.QWidget()
self.tab_temp_all.setObjectName("tab_temp_all")
@ -64,6 +72,9 @@ class Ui_MainWindow(object):
self.graphicsView.setObjectName("graphicsView")
self.verticalLayout.addWidget(self.graphicsView)
self.tabWidget.addTab(self.tab_temp_all, "")
self.tab_timeline_event = QtWidgets.QWidget()
self.tab_timeline_event.setObjectName("tab_timeline_event")
self.tabWidget.addTab(self.tab_timeline_event, "")
self.horizontalLayout_2.addWidget(self.tabWidget)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(MainWindow)
@ -91,7 +102,8 @@ class Ui_MainWindow(object):
"<html><head><meta name=\"qrichtext\" content=\"1\" /><style type=\"text/css\">\n"
"p, li { white-space: pre-wrap; }\n"
"</style></head><body style=\" font-family:\'SimSun\'; font-size:9pt; font-weight:400; font-style:normal;\">\n"
"<p style=\" margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;\">通过文件上传一键日志压缩包来开始</p></body></html>"))
"<p style=\" margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;\"><span style=\" font-size:18pt; font-weight:600;\">通过文件上传一键日志压缩包来开始</span></p>\n"
"<p style=\" margin-top:0px; margin-bottom:0px; margin-left:0px; margin-right:0px; -qt-block-indent:0; text-indent:0px;\"><span style=\" font-size:18pt; font-weight:600;\">或者将日志压缩包拖入程序</span></p></body></html>"))
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tab_console), _translate("MainWindow", "控制台"))
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tab_baseinfo), _translate("MainWindow", "基本信息"))
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tab_alert), _translate("MainWindow", "关键告警"))
@ -107,6 +119,7 @@ class Ui_MainWindow(object):
self.comboBox.setItemText(9, _translate("MainWindow", "VR_CPU_Temp"))
self.comboBox.setItemText(10, _translate("MainWindow", "VR_FPGA_Temp"))
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tab_temp_all), _translate("MainWindow", "温度信息"))
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tab_timeline_event), _translate("MainWindow", "事件时间线"))
self.menu.setTitle(_translate("MainWindow", "文件"))
self.actionUpload_log.setText(_translate("MainWindow", "打开"))
self.actionUpload_log.setStatusTip(_translate("MainWindow", "上传日志文件到程序中"))

@ -3,11 +3,13 @@ import utils
import json
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "tmp")
cache_dir = os.path.join(project_root, "okd_tmp")
log_dir = os.path.join(cache_dir, "onekeylog")
component_dir = os.path.join(log_dir, "component")
component_file = os.path.join(component_dir, "component.log")
running_dir = os.path.join(log_dir, "runningdata")
running_file = os.path.join(running_dir, "rundatainfo.log")
baseinfo_file = os.path.join(cache_dir, "baseinfo.txt")
def get_fw_version_info():
@ -60,11 +62,49 @@ def get_fru_info():
# hw_info_str = "\nDPU Hardware Infomation:\n"
def get_current_settings():
EEPROM_PAGE10_STR = "EEPROM PAGE 10 data:"
EEPROM_PAGE70_STR = "EEPROM PAGE 70 data:"
synergy_flag = utils.get_nth_integer_after_line(running_file, EEPROM_PAGE10_STR, 2)
dpu_post_flag = utils.get_nth_integer_after_line(running_file, EEPROM_PAGE10_STR, 3)
power_policy_flag = utils.get_nth_integer_after_line(running_file, EEPROM_PAGE70_STR, 1)
print(synergy_flag)
print(dpu_post_flag)
print(power_policy_flag)
if synergy_flag == 0:
synergy_str = "独立"
else:
synergy_str = "协同"
if dpu_post_flag == 0:
dpu_post_str = "OS加载完成"
else:
dpu_post_str = "OS加载未完成"
if power_policy_flag == 0:
power_policy_str = "Always-off"
elif power_policy_flag == 2:
power_policy_str = "Always-on"
else:
power_policy_str = "Restore Policy"
settings_str = "\nDPU Running Configuration\n"
settings_str += f"Power Policy : {power_policy_str}\n"
settings_str += f"Sync Policy : {synergy_str}\n"
settings_str += f"DPU Post Status : {dpu_post_str}\n"
print(settings_str)
utils.append_to_file(baseinfo_file, settings_str)
def program_main():
get_fw_version_info()
get_fru_info()
# get_hardware_info()
get_current_settings()
return True

257
src/ZiJin_parse_event.py Normal file

@ -0,0 +1,257 @@
''' 模块说明
1.目标json格式, json记录中以数组形式记录
{
"id": 1,
"timestamp": "2023-01-15T08:30:00",
"title": "系统启动",
"details": {
Desc: "AC"
}
}
2.需要记录入的事件类型
Title keywords Detail
--------------- 常规 ---------------
BMC启动 | BMC_Boot | { Cause }
系统重启 | SYS_Boot | { Cause }
电源状态Down | S5/G2 |
电源状态Up | S0/G0 |
固件更新 | FW UPDATE | { Compoent , version_change }
--------------- 故障 ---------------
高温事件 | {温度传感器} | { SensorName , level, IDL Desc }
电压事件 | {电压传感器} | { SensorName , level, IDL Desc }
故障事件 | {其它Assert事件} | { SensorName , level, IDL_Desc }
'''
import os
import utils
import json
import re
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "okd_tmp")
log_dir = os.path.join(cache_dir, "onekeylog")
idl_dir = os.path.join(log_dir, "log")
idl_file = os.path.join(idl_dir, "idl.log")
cache_idl = os.path.join(cache_dir, "merge_idl.log")
eventline_json = os.path.join(cache_dir, "eventline.json")
TempSensorSpec = [ "Inlet_CPU_Temp", "Outlet_CPU_Temp", "M_2_Temp_on_MB", "Inlet_FPGA_Temp",
"Outlet_FPGA_Temp", "FPGA_Temp", "FPGA_VR_Temp", "NIC_OPT0_Temp", "NIC_OPT1_Temp",
"NIC_OPT2_Temp", "NIC_OPT3_Temp", "CPU_Temp", "DIMM_Temp", "MEM_VR_Temp",
"CPU_VR_Temp"]
VoltSensorSpec = [ "A_PVCCANA", "A_P1V8_STBY", "A_PVNN_PCH", "CPU_VCORE", "A_PVTT", "A_P2V5_AUX",
"SYS_3V3", "A_P1V2_BMC_DDR4", "A_P1V15_AUX", "SYS_5V", "A_P1V05", "CPU_DDR_VDDQ0",
"P3V_BAT", "VCCH_GXE_1V1", "VCCL_0V8", "VCCIO_PIO_1V2", "VPP_2V5", "P3V3",
"VCCH_0V9", "VCCL_SDM_0V8", "VCCPT_1V8", "SYS_12V", "P12V_B"]
CommonKey = [ "BMC_Boot", "SYS_Boot", "S5/G2", "S0/G0", "FW UPDATE"]
ErrorKey = [ "Critical", "Warning" ]
g_json_record_id = 0
def init_json_file(file_path=eventline_json):
"""
初始化一个JSON文件内容为空列表
参数:
file_path (str): JSON文件的路径
返回:
bool: 初始化成功返回True失败返回False
"""
try:
# 检查文件是否已存在
if os.path.exists(file_path):
print(f"文件 {file_path} 已存在,无需初始化")
return True
# 创建并写入空列表
with open(file_path, 'w', encoding='utf-8') as f:
json.dump([], f, ensure_ascii=False, indent=2)
print(f"JSON文件 {file_path} 初始化成功")
return True
except Exception as e:
print(f"初始化JSON文件失败: {str(e)}")
return False
def add_data_to_json(data, file_path=eventline_json):
"""
向JSON文件中添加一组数据追加到列表中
参数:
file_path (str): JSON文件的路径
data (any): 要添加的数据可以是字典列表字符串等可序列化类型
返回:
bool: 添加成功返回True失败返回False
"""
try:
# 检查文件是否存在,不存在则先初始化
if not os.path.exists(file_path):
if not init_json_file(file_path):
return False
# 读取现有数据
with open(file_path, 'r', encoding='utf-8') as f:
try:
content = json.load(f)
# 确保内容是列表类型
if not isinstance(content, list):
print(f"JSON文件内容不是列表无法添加数据")
return False
except json.JSONDecodeError:
print(f"JSON文件格式错误无法读取")
return False
# 添加新数据
content.append(data)
# 写回文件
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(content, f, ensure_ascii=False, indent=2)
# print(f"数据已成功添加到 {file_path}")
return True
except Exception as e:
print(f"添加数据到JSON文件失败: {str(e)}")
return False
def parse_common_event_to_json(log_str, keys):
timestamp = utils.extract_first_second_level_timestamp(log_str)
idl_dict = utils.parse_idllog_line(log_str)
details_info = {}
match keys:
case "BMC_Boot":
title = "BMC启动"
cause = idl_dict['description']
details_info['原因'] = cause
case "SYS_Boot":
title = "系统重启"
cause = idl_dict['description']
details_info['原因'] = cause
case "S5/G2":
title = "电源状态Down"
case "S0/G0":
title = "电源状态Up"
case "FW UPDATE":
title = "固件更新"
cause = idl_dict['description']
details_info['版本变更'] = cause
case _:
print("No Valid Key!")
return
global g_json_record_id
json_record_id = g_json_record_id
g_json_record_id = g_json_record_id + 1
json_record = {}
json_record["id"] = json_record_id
json_record["timestamp"] = timestamp
json_record["title"] = title
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
def parse_error_event_to_json(log_str, level):
# 检查是否是 Deassert 事件
if utils.is_full_word_present(log_str, "Deassert"):
return
global g_json_record_id
json_record_id = g_json_record_id
g_json_record_id = g_json_record_id + 1
json_record = {}
details_info = {}
timestamp = utils.extract_first_second_level_timestamp(log_str)
log_dict = utils.parse_idllog_line(log_str)
json_record["id"] = json_record_id
json_record["timestamp"] = timestamp
details_info["Level"] = level
details_info["IDL_Desc"] = log_dict['description']
details_info["Sensor"] = log_dict['sensor']
# 判定是否为温度事件
for key in TempSensorSpec:
if key in log_str:
json_record["title"]=f"温度事件-{key}"
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
return
# 判定是否为电压事件
for key in VoltSensorSpec:
if key in log_str:
json_record["title"]=f"电压事件-{key}"
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
return
# 故障事件判定
json_record["title"] = f"故障事件-{log_dict['sensor']}"
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
return
def check_line_with_keywords(log_str):
# 检查标准事件
for key in CommonKey:
if key in log_str:
parse_common_event_to_json(log_str, key)
return
# 检查故障事件
for key in ErrorKey:
if key in log_str:
parse_error_event_to_json(log_str, key)
return
def scan_event_from_idl():
global g_json_record_id
g_json_record_id = 1
init_json_file()
try:
with open(cache_idl, 'r', encoding='utf-8') as file:
for line_num, line_content in enumerate(file, 1): # 行号从1开始
# 检查当前行是否包含任何关键词
check_line_with_keywords(line_content)
# print(f"Line: {line_num}, has event")
except FileNotFoundError:
raise FileNotFoundError(f"文件不存在: {cache_idl}")
except Exception as e:
raise Exception(f"扫描文件时发生错误: {str(e)}")
def get_event_json():
try:
# 读取并解析JSON文件
with open(eventline_json, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# 更新表格数据
# print(json_data)
return json_data
except json.JSONDecodeError:
print("错误", "JSON格式错误, 无法解析文件")
except Exception as e:
print( "错误", f"加载文件失败: {str(e)}")
def program_main():
# parse idl已完成了 idl 的合并任务, 这里直接读取即可
scan_event_from_idl()
return True
if __name__ == "__main__":
# scan_event_from_idl()
json_data = get_event_json()

@ -1 +1,188 @@
import os
import utils
import json
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "okd_tmp")
log_dir = os.path.join(cache_dir, "onekeylog")
idl_dir = os.path.join(log_dir, "log")
idl_file = os.path.join(idl_dir, "idl.log")
cache_idl = os.path.join(cache_dir, "merge_idl.log")
cache_sys_error = os.path.join(cache_dir, "idl_error.json")
def is_alertjson_file_empty():
"""
判断JSON文件是否为空
"""
try:
# 检查文件是否存在
with open(cache_sys_error, 'r', encoding='utf-8') as f:
content = f.read().strip()
# 检查文件是否完全为空
if not content:
return True
# 尝试解析JSON
try:
data = json.loads(content)
# 检查JSON内容是否为空对象或空数组
if data in ({}, []):
return True
else:
return False
except json.JSONDecodeError:
return False
except FileNotFoundError:
return None
except Exception as e:
return None
# def parse_log_line(log_line):
# """
# 解析特定格式的日志行,提取关键信息
# 参数:
# log_line (str): 要解析的日志行字符串
# 返回:
# dict: 包含提取的信息的字典若解析失败则返回None
# """
# try:
# # 处理开头的 <162> 部分,先移除这部分内容
# # 找到第一个空格,跳过 <162> 部分
# first_space_index = log_line.find(' ')
# if first_space_index == -1:
# return None
# content_after_prefix = log_line[first_space_index:].strip()
# # 从剩余内容中提取第一个时间戳(到下一个空格)
# timestamp_end = content_after_prefix.find(' ')
# if timestamp_end == -1:
# return None
# timestamp = content_after_prefix[:timestamp_end].strip()
# # 查找包含|分隔符的部分
# pipe_start = log_line.find('|')
# if pipe_start == -1:
# return None
# pipe_content = log_line[pipe_start:]
# # 按|分割内容
# parts = [part.strip() for part in pipe_content.split('|') if part.strip()]
# # 检查是否有足够的部分
# if len(parts) < 5:
# return None
# # 提取各部分信息
# component_type = parts[1] # 部件类型
# event_type = parts[2] # 事件类型
# event_level = parts[3] # 事件等级
# event_code = parts[4] # 事件代码
# # 事件描述是剩余部分的组合
# event_description = '|'.join(parts[5:]) if len(parts) > 5 else ""
# return {
# 'timestamp': timestamp,
# 'component_type': component_type,
# 'event_type': event_type,
# 'event_level': event_level,
# 'event_code': event_code,
# 'description': event_description
# }
# except Exception as e:
# print(f"解析日志行时出错: {str(e)}")
# return None
def process_log_with_keywords(file_path, keywords, json_output_path=cache_sys_error):
"""
结合关键字匹配日志解析并将结果保存为JSON文件的完整处理函数
参数:
file_path (str): 日志文件路径
keywords (list): 关键字列表
json_output_path (str): 保存结果的JSON文件路径
"""
if not keywords:
raise ValueError("关键字列表不能为空")
lower_keywords = [keyword.lower() for keyword in keywords]
match_count = 0
parsed_results = []
try:
with open(file_path, 'r', encoding='utf-8') as file:
for line_number, line in enumerate(file, 1):
line_lower = line.lower()
if any(keyword in line_lower for keyword in lower_keywords):
print(f"\n{line_number}行: {line.strip()}")
match_count += 1
# 解析日志行
parsed = utils.parse_idllog_line(line)
if parsed:
# 添加行号信息,方便追溯
parsed['line_number'] = line_number
parsed_results.append(parsed)
print("提取的信息:")
for key, value in parsed.items():
print(f" {key}: {value}")
else:
print(" 无法解析此行的格式")
print(f"\n总计找到 {match_count} 行包含指定关键字(不区分大小写)")
# 保存结果到JSON文件
try:
# 检查输出目录是否存在,不存在则创建
output_dir = os.path.dirname(json_output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# 写入JSON文件使用缩进使格式更易读
with open(json_output_path, 'w', encoding='utf-8') as json_file:
json.dump(parsed_results, json_file, ensure_ascii=False, indent=2)
print(f"提取的信息已保存到: {json_output_path}")
except Exception as e:
print(f"保存JSON文件时出错: {str(e)}")
return parsed_results
except FileNotFoundError:
raise FileNotFoundError(f"文件 '{file_path}' 不存在")
except Exception as e:
raise Exception(f"处理文件时发生错误: {str(e)}")
def get_idl_alert_json():
try:
# 读取并解析JSON文件
with open(cache_sys_error, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# 更新表格数据
return json_data
except json.JSONDecodeError:
print("错误", "JSON格式错误, 无法解析文件")
except Exception as e:
print( "错误", f"加载文件失败: {str(e)}")
def parse_idl_error():
utils.merge_logrotate_files(idl_file, 2, cache_idl)
error_keys = [ "Critical", "warning" ]
process_log_with_keywords(cache_idl, error_keys)
def program_main():
parse_idl_error()
return True
if __name__ == "__main__":
parse_idl_error()

@ -7,7 +7,7 @@ import argparse
import utils
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "tmp")
cache_dir = os.path.join(project_root, "okd_tmp")
default_chart_dir = os.path.join(cache_dir, "chart")
def parse_temperature_data(file_path):
@ -44,7 +44,7 @@ def parse_temperature_data(file_path):
try:
# 尝试多种常见时间格式
timestamp_formats = [
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%S ',
]
timestamp = None
@ -139,7 +139,7 @@ def generate_temperature_charts(df, output_dir=default_chart_dir):
os.makedirs(output_dir, exist_ok=True)
# 设置中文字体支持
plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"]
plt.rcParams["font.family"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False # 正确显示负号
# 1. 所有温度点的趋势图
@ -226,31 +226,13 @@ def program_main():
onekeylog_root = os.path.join(cache_dir, "onekeylog")
log_dir = os.path.join(onekeylog_root, "log")
sensor_file0 = os.path.join(log_dir, "sensorhistory.log")
sensor_file1 = os.path.join(log_dir, "sensorhistory.log.1")
sensor_merge = os.path.join(cache_dir, "sensorhistory_merge.log")
if not os.path.exists(sensor_file0):
print("无温度数据日志文件")
return False
try:
# 合并文件
with open(sensor_merge, 'w', encoding='utf-8') as out_f:
# 写入第一个文件内容
with open(sensor_file0, 'r', encoding='utf-8') as f1:
out_f.write(f1.read())
out_f.write('\n') # 在两个文件内容之间添加空行分隔
if os.path.exists(sensor_file1):
# 写入第二个文件内容
with open(sensor_file1, 'r', encoding='utf-8') as f2:
out_f.write(f2.read())
print(f"成功合并文件到: {sensor_merge}")
except Exception as e:
print(f"合并文件失败: {str(e)}")
return False
utils.merge_logrotate_files(sensor_file0, 2, sensor_merge)
# 解析数据
print(f"正在解析数据文件: {sensor_merge}")

@ -1,13 +1,14 @@
import sys
import os
from PyQt5.QtWidgets import (QApplication, QMainWindow, QFileDialog, QTextBrowser,
from PyQt5.QtWidgets import (QApplication, QMainWindow, QFileDialog, QTextBrowser, QVBoxLayout,
QMessageBox, QTextEdit, QGraphicsScene, QGraphicsPixmapItem)
from PyQt5.QtCore import Qt
from PyQt5.QtCore import Qt, QMimeData
from PyQt5.QtGui import QPixmap
from PyQt5.QtGui import QPainter # 单独导入QPainter用于抗锯齿设置
from PyQt5.QtGui import QStandardItemModel, QStandardItem, QDragEnterEvent, QDropEvent
from MainWindow_ui import Ui_MainWindow # 导入转换后的UI类
import service
import timelineEvent
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self):
@ -17,6 +18,9 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.parseStatus = service.ServiceStatus()
self.init_graphic_views()
self.init_textbrowser_style()
self.init_tableView_alert_model()
self.initUI()
self.reInitTimeLineTab()
self.comboBoxTextDict = {
"ALL_Temp": "all",
@ -32,12 +36,53 @@ class MainWindow(QMainWindow, Ui_MainWindow):
"VR_FPGA_Temp": "vr_fpga"
}
def initUI(self):
# 允许窗口接收拖放事件
self.setAcceptDrops(True)
def reInitTimeLineTab(self):
# 1. 清除目标标签页的现有布局
if hasattr(self.tab_timeline_event, 'layout'):
# 移除现有布局中的所有组件
layout = self.tab_timeline_event.layout()
if layout:
while layout.count():
item = layout.takeAt(0)
widget = item.widget()
if widget:
widget.deleteLater()
# 2. 创建时间线内容组件实例
self.timeline_content = timelineEvent.TimelineTabContent()
# 3. 为标签页创建新布局并添加时间线组件
layout = QVBoxLayout(self.tab_timeline_event)
layout.setContentsMargins(0, 0, 0, 0) # 可选:去除边距
layout.addWidget(self.timeline_content)
def dragEnterEvent(self, event: QDragEnterEvent):
"""拖入事件:判断拖入的是否是文件"""
if event.mimeData().hasUrls():
# 只接受文件拖入
event.acceptProposedAction()
else:
event.ignore()
def dropEvent(self, event: QDropEvent):
"""放下事件:处理拖放的文件"""
# 获取拖放的文件路径列表
file_paths = [url.toLocalFile() for url in event.mimeData().urls()]
# 调用你的文件上传函数
for file_path in file_paths:
self.process_uploaded_file(file_path) # 这是你已经写好的上传函数
def init_textbrowser_style(self):
# 设置样式表(全局文本样式)
self.textBrowser_info.setStyleSheet("""
QTextBrowser {
font-family: 'SimHei';
font-size: 14px;
font-size: 24px;
color: #2c3e50;
background-color: #f8f9fa;
border: 1px solid #ddd;
@ -62,15 +107,45 @@ class MainWindow(QMainWindow, Ui_MainWindow):
self.graphicsView.setRenderHint(QPainter.Antialiasing) # 抗锯齿
self.graphicsView.setRenderHint(QPainter.SmoothPixmapTransform) # 平滑缩放
# 初始化图形场景和视图
# self.scene_temp_cpu = QGraphicsScene(self) # 创建场景
# self.graphicsView_temp_cpu.setScene(self.scene_temp_cpu) # 将场景绑定到视图
# self.graphicsView_temp_cpu.setRenderHint(QPainter.Antialiasing) # 抗锯齿
# self.graphicsView_temp_cpu.setRenderHint(QPainter.SmoothPixmapTransform) # 平滑缩放
# 存储当前显示的图片项
self.pixmap_item = None
self.current_image_path = None
def init_tableView_alert_model(self):
# 创建一个4行3列的模型
self.model_alert = QStandardItemModel(4, 5)
# 设置表头
self.model_alert.setHorizontalHeaderLabels(["时间", "部件", "等级", "方向", "描述"])
# 将模型绑定到QTableView
self.tableView_alert.setModel(self.model_alert)
# 可选:调整列宽自适应内容
self.tableView_alert.horizontalHeader().setSectionResizeMode(
self.tableView_alert.horizontalHeader().ResizeToContents
)
def show_error_message(self, file_path):
"""显示格式错误提示弹窗"""
QMessageBox.critical(
self,
"文件格式错误",
f"不支持的文件格式:\n{os.path.basename(file_path)}\n\n请上传.tar.gz格式的压缩包。"
)
def is_valid_file_format(self, file_path):
"""检查文件是否为.tar.gz格式"""
# 获取文件后缀
_, ext = os.path.splitext(file_path)
# 先检查是否有.gz后缀
if ext.lower() == '.gz':
# 再检查去掉.gz后的后缀是否为.tar
base, ext2 = os.path.splitext(os.path.splitext(file_path)[0])
return ext2.lower() == '.tar'
return False
def upload_file(self):
"""打开文件选择对话框并处理选中的文件"""
@ -94,7 +169,11 @@ class MainWindow(QMainWindow, Ui_MainWindow):
# self.text_edit.append("已取消文件选择")
def process_uploaded_file(self, file_path):
"""处理上传的文件(这里仅展示文件信息,可根据需求扩展)"""
"""处理上传的文件"""
if not self.is_valid_file_format(file_path):
self.show_error_message(file_path)
return
try:
# 获取文件信息
file_name = os.path.basename(file_path)
@ -120,12 +199,62 @@ class MainWindow(QMainWindow, Ui_MainWindow):
if self.parseStatus.sensorhistory_status:
self.display_pic(service.get_sensorhistory_path("all"))
if self.parseStatus.parseidl_status:
if not service.is_idl_alert_empty():
alert_json = service.get_idl_alert_json()
self.fill_tableView_alert(alert_json)
if self.parseStatus.eventline_status:
event_json = service.get_timeline_event_json()
self.timeline_content.load_events_from_json(event_json)
self.textBrowser_console.insertPlainText("完成文件解析\n")
except Exception as e:
QMessageBox.critical(self, "处理失败", f"文件处理出错:{str(e)}")
def fill_tableView_alert(self, json_data):
"""从JSON数据更新表格内容"""
# 清空现有数据
self.model_alert.clear()
# 根据JSON数据结构设置表头和内容
if isinstance(json_data, list):
# 处理列表类型的JSON如多条记录
if json_data and isinstance(json_data[0], dict):
# 使用第一条记录的键作为表头
headers = json_data[0].keys()
self.model_alert.setHorizontalHeaderLabels(headers)
# 添加所有行数据
for item in json_data:
row_items = []
for key in headers:
# 将值转换为字符串显示
value = str(item.get(key, ""))
row_items.append(QStandardItem(value))
self.model_alert.appendRow(row_items)
elif isinstance(json_data, dict):
# 处理字典类型的JSON键值对
self.model_alert.setHorizontalHeaderLabels(["", ""])
for key, value in json_data.items():
key_item = QStandardItem(str(key))
value_item = QStandardItem(str(value))
# 设置单元格不可编辑
key_item.setEditable(False)
value_item.setEditable(False)
self.model_alert.appendRow([key_item, value_item])
else:
# 处理简单类型的JSON
self.model_alert.setHorizontalHeaderLabels(["数据"])
self.model_alert.appendRow([QStandardItem(str(json_data))])
# 调整列宽以适应内容
self.tableView_alert.horizontalHeader().setSectionResizeMode(
self.tableView_alert.horizontalHeader().ResizeToContents
)
def on_combo_changed(self, selected_text):
"""下拉列表选项变化时切换图片"""
self.display_pic(service.get_sensorhistory_path(self.comboBoxTextDict[selected_text]))

@ -3,11 +3,15 @@ import os
import utils
import ZiJin_parse_sensorhistory as sensorparse
import ZiJin_parse_baseinfo as baseinfo
import ZiJin_parse_idl as parseidl
import ZiJin_parse_event as zijin_event
class ServiceStatus():
def __init__(self):
self.sensorhistory_status = False
self.baseinfo_status = False
self.parseidl_status = False
self.eventline_status = False
def set_sensorhistory_status(self, status):
self.sensorhistory_status = status
@ -20,10 +24,22 @@ class ServiceStatus():
def get_baseinfo_status(self):
return self.baseinfo_status
def set_parseidl_status(self, status):
self.parseidl_status = status
def get_parseidl_status(self):
return self.parseidl_status
def set_eventline_status(self, status):
self.eventline_status = status
def get_eventline_status(self):
return self.eventline_status
def app_cache_init():
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "tmp")
cache_dir = os.path.join(project_root, "okd_tmp")
utils.clean_log_data(cache_dir)
os.mkdir(cache_dir)
@ -31,7 +47,7 @@ def app_cache_init():
def send_log_to_cache(filepath):
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "tmp")
cache_dir = os.path.join(project_root, "okd_tmp")
utils.unzip_log(filepath, cache_dir)
def get_sensorhistory_path(type):
@ -40,9 +56,24 @@ def get_sensorhistory_path(type):
def get_baseinfo_str():
return baseinfo.get_all_infostring()
def get_idl_alert_json():
return parseidl.get_idl_alert_json()
def is_idl_alert_empty():
return parseidl.is_alertjson_file_empty()
def get_timeline_event_json():
return zijin_event.get_event_json()
def start_diagnose(parseStatus):
result_sensorhistory = sensorparse.program_main()
parseStatus.set_sensorhistory_status(result_sensorhistory)
result_baseinfo = baseinfo.program_main()
parseStatus.set_baseinfo_status(result_baseinfo)
parseStatus.set_baseinfo_status(result_baseinfo)
result_parseidl = parseidl.program_main()
parseStatus.set_parseidl_status(result_parseidl)
result_eventline = zijin_event.program_main()
parseStatus.set_eventline_status(result_eventline)

288
src/timelineEvent.py Normal file

@ -0,0 +1,288 @@
import sys
import datetime
import json
from PyQt5.QtWidgets import (QApplication, QWidget, QGraphicsView, QGraphicsScene,
QGraphicsTextItem, QToolTip, QVBoxLayout,
QTabWidget, QLabel)
from PyQt5.QtCore import Qt, QEvent
from PyQt5.QtGui import QPen, QBrush, QColor, QFont, QPainter
class TimelineEvent:
"""事件数据封装类"""
def __init__(self, event_id, timestamp, title, details):
self.id = event_id
self.timestamp = timestamp # datetime对象
self.title = title
self.details = details # 详细信息字典
class TimelineGraphicsView(QGraphicsView):
"""时间线视图实现3个事件一组的高度调整和间隔时间戳显示"""
def __init__(self, parent=None):
super().__init__(parent)
self.scene = QGraphicsScene(self)
self.setScene(self.scene)
# 视图配置
self.setRenderHint(QPainter.Antialiasing)
self.setDragMode(QGraphicsView.ScrollHandDrag)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOn)
self.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded)
# 缩放设置
self.setTransformationAnchor(QGraphicsView.AnchorUnderMouse)
self.setResizeAnchor(QGraphicsView.AnchorViewCenter)
self.current_scale = 1.0
self.min_scale = 0.5
self.max_scale = 5.0
# 布局核心参数
self.events = []
self.event_items = []
self.group_size = 3 # 3个事件为一组
self.timestamp_interval = 10 # 每隔10个事件显示时间戳
self.base_offset = 30 # 基础高度偏移量
self.timeline_center_y = 0 # 时间线中线Y坐标
self.min_horizontal_spacing = 60 # 事件水平间距
# 自定义颜色方案(可根据需要修改)
self.group_colors = [
(255, 99, 71), # 组1番茄红
(255, 255, 150), # 组2 浅黄
(100, 255, 100), # 组3 浅绿
(147, 112, 219), # 组4 紫色
(255, 165, 0), # 组5 橙色(循环复用)
(0, 0, 150) # 组6 :浅蓝
]
def wheelEvent(self, event):
"""鼠标滚轮缩放"""
delta = event.angleDelta().y()
if delta > 0:
new_scale = self.current_scale * 1.1
else:
new_scale = self.current_scale * 0.9
new_scale = max(self.min_scale, min(new_scale, self.max_scale))
if new_scale != self.current_scale:
self.scale(new_scale / self.current_scale, new_scale / self.current_scale)
self.current_scale = new_scale
def eventFilter(self, obj, event):
"""事件悬停提示"""
if obj == self.viewport() and event.type() == QEvent.MouseMove:
scene_pos = self.mapToScene(event.pos())
for item, event_data in self.event_items:
if item.contains(item.mapFromScene(scene_pos)):
details_text = f"<b>{event_data.title}</b><br>"
details_text += f"时间: {event_data.timestamp.strftime('%Y-%m-%d %H:%M:%S')}<br><br>"
for key, value in event_data.details.items():
details_text += f"{key}: {value}<br>"
QToolTip.showText(event.globalPos(), details_text)
return True
QToolTip.hideText()
return super().eventFilter(obj, event)
def add_events(self, events):
"""添加事件并按组布局"""
self.events = events
if not self.events:
return
self.resetTransform()
self.current_scale = 1.0
self.events.sort(key=lambda x: x.timestamp)
# 计算场景尺寸
margin = 80
event_count = len(self.events)
scene_width = max(1200, margin * 2 + event_count * self.min_horizontal_spacing)
scene_height = 400 # 足够容纳上下偏移的事件
self.scene.setSceneRect(0, 0, scene_width, scene_height)
self.timeline_center_y = scene_height / 2 # 时间线中线
self.draw_timeline(scene_width, margin)
self.draw_events(scene_width, margin)
def draw_timeline(self, scene_width, margin):
"""绘制时间线基线和间隔时间戳"""
self.scene.clear()
# 绘制主基线
self.scene.addLine(
margin, self.timeline_center_y,
scene_width - margin, self.timeline_center_y,
QPen(QColor(60, 60, 60), 2)
)
# 每隔10个事件显示时间戳
event_count = len(self.events)
if event_count > 0:
interval = (scene_width - 2 * margin) / max(1, event_count - 1)
# 确保第一个事件一定显示时间戳
for i in range(0, event_count, self.timestamp_interval):
x_pos = margin + i * interval
# 时间戳标签
time_text = self.events[i].timestamp.strftime('%Y-%m-%d %H:%M')
time_label = QGraphicsTextItem(time_text)
time_label.setFont(QFont("SimHei", 9))
# 调整标签位置,避免重叠
time_label.setPos(x_pos - time_label.boundingRect().width()/2,
self.timeline_center_y + 70)
self.scene.addItem(time_label)
# 时间戳标记线
self.scene.addLine(
x_pos, self.timeline_center_y - 8,
x_pos, self.timeline_center_y + 8,
QPen(QColor(100, 100, 100), 2)
)
def get_event_height_offset(self, index):
"""
计算事件高度偏移量
规则3个事件为一组调整高度
"""
# 计算在组中的位置0-2
group_pos = index % self.group_size
# 高度偏移模式定义(相对于时间线中线)
# 正值:上方,负值:下方
offset_pattern = [
0, # 0: 组内第一个事件(中线)
self.base_offset, # 1: 组内第二个事件(上移)
-self.base_offset # 2: 组内第三个事件(下移)
]
return offset_pattern[group_pos]
def get_event_color(self, index, event):
"""
确定事件标记的颜色
"""
match event.title:
case _ if "故障事件" in event.title:
return self.group_colors[0]
case _ if "温度事件" in event.title:
return self.group_colors[1]
case "固件更新":
return self.group_colors[2]
case _ if "电压事件" in event.title:
return self.group_colors[3]
case _:
return self.group_colors[5]
def draw_events(self, scene_width, margin):
"""绘制事件按3个一组调整高度"""
self.event_items = []
total_events = len(self.events)
# 计算X坐标均匀分布
event_x_positions = []
if total_events <= 1:
event_x_positions = [margin + (scene_width - 2 * margin) / 2] if total_events else []
else:
interval = (scene_width - 2 * margin) / (total_events - 1)
event_x_positions = [margin + i * interval for i in range(total_events)]
for index, event in enumerate(self.events):
x_pos = event_x_positions[index]
# 获取高度偏移
height_offset = self.get_event_height_offset(index)
event_y = self.timeline_center_y - height_offset # Y坐标计算
# 获取事件颜色
r, g, b = self.get_event_color(index, event)
# 事件标记
event_radius = 6
# 边框颜色稍深宽度2px
pen = QPen(QColor(r-30, g-30, b-30), 2)
# 填充:主色,带透明度
brush = QBrush(QColor(r, g, b, 180))
event_item = self.scene.addEllipse(
x_pos - event_radius, event_y - event_radius,
event_radius * 2, event_radius * 2,
pen, # 边框
brush # 填充
)
# 事件标题
text_item = QGraphicsTextItem(event.title)
text_item.setFont(QFont("SimHei", 9))
# 文本位置调整
text_width = text_item.boundingRect().width()
# 右侧边界检查
if x_pos + text_width > scene_width - margin:
text_x = x_pos - 10 # 向左
else:
text_x = x_pos + 10 # 向右
# 上下位置调整
if height_offset > 0: # 上方事件
text_y = event_y - 20
elif height_offset < 0: # 下方事件
text_y = event_y + 10
else: # 中线事件
# 中线事件文本交替上下放置,避免重叠
text_y = event_y - 20 if (index // self.group_size) % 2 == 0 else event_y + 10
text_item.setPos(text_x, text_y)
self.scene.addItem(text_item)
# 连接线
self.scene.addLine(
x_pos, self.timeline_center_y,
x_pos, event_y,
QPen(QColor(150, 150, 150), 1, Qt.DashLine)
)
self.event_items.append((event_item, event))
class TimelineTabContent(QWidget):
"""时间线标签页内容"""
def __init__(self, parent=None):
super().__init__(parent)
self.init_ui()
def init_ui(self):
layout = QVBoxLayout(self)
title_label = QLabel("事件时间线")
title_label.setFont(QFont("SimHei", 12, QFont.Bold))
layout.addWidget(title_label)
self.timeline_view = TimelineGraphicsView(self)
self.timeline_view.setMouseTracking(True)
self.timeline_view.viewport().installEventFilter(self.timeline_view)
layout.addWidget(self.timeline_view)
self.setLayout(layout)
def load_events_from_json(self, json_data):
"""从JSON数据加载事件"""
events = []
for item in json_data:
json_data_str = json.loads(item)
# 解析时间戳
timestamp = datetime.datetime.fromisoformat(json_data_str['timestamp'])
if timestamp.year < 2022:
print("记录时间戳年份信息不满足,不予展示, 跳过")
continue
event = TimelineEvent(
event_id=json_data_str['id'],
timestamp=timestamp,
title=json_data_str['title'],
details=json_data_str['details']
)
events.append(event)
# 添加事件到时间线
self.timeline_view.add_events(events)

@ -2,6 +2,8 @@ import os
import tarfile
import shutil
import stat
import re
from datetime import datetime
def get_project_root():
current_file = os.path.abspath(__file__)
@ -133,4 +135,231 @@ def read_file_to_string(file_path):
print(f"错误:文件 '{file_path}' 不是 UTF-8 编码,无法读取")
except Exception as e:
print(f"读取文件失败:{str(e)}")
return None
return None
def merge_logrotate_files(source_path, num_files, output_path):
"""
合并由logrotate分割的文件
参数:
source_path (str): 原始文件路径不包含.1, .2等后缀
num_files (int): 要合并的文件数量包括原始文件
output_path (str): 合并后文件的输出路径
"""
if num_files < 1:
raise ValueError("文件数量必须至少为1")
# 构建要合并的文件列表
# 日志轮转文件通常按 .1(最新备份), .2(次新), ... 原始文件(最新)的顺序排列
files_to_merge = []
# 添加备份文件(从.1到.num_files-1
for i in range(1, num_files):
backup_file = f"{source_path}.{i}"
if os.path.exists(backup_file):
files_to_merge.append(backup_file)
else:
print(f"警告: 备份文件 {backup_file} 不存在,已跳过")
# 添加原始文件(最新的日志)
if os.path.exists(source_path):
files_to_merge.append(source_path)
else:
raise FileNotFoundError(f"原始文件 {source_path} 不存在")
# 如果找到的文件少于要求的数量,给出警告
if len(files_to_merge) < num_files:
print(f"警告: 只找到 {len(files_to_merge)} 个文件,而不是要求的 {num_files}")
# 合并文件
with open(output_path, 'w') as outfile:
for file_path in files_to_merge:
try:
with open(file_path, 'r') as infile:
# 读取并写入文件内容
outfile.write(infile.read())
# 在文件之间添加一个换行,避免内容粘连
outfile.write('\n')
print(f"已合并: {file_path}")
except Exception as e:
print(f"合并文件 {file_path} 时出错: {str(e)}")
print(f"所有文件已合并至: {output_path}")
def get_nth_integer_after_line(file_path, target_string, n=1):
"""
打开文件找到包含目标字符串的行读取下一行并提取第n个整数
参数:
file_path: 文件路径
target_string: 要查找的目标字符串
n: 要返回的第几个整数从1开始计数
返回:
找到的第n个整数如果未找到则返回None
"""
if n < 1:
print("错误n必须是大于等于1的整数")
return None
try:
with open(file_path, 'r', encoding='utf-8') as file:
# 逐行读取文件
for line in file:
# 检查当前行是否包含目标字符串
if target_string in line:
# 读取下一行
next_line = next(file, None)
if next_line is None:
print("目标字符串所在行为文件最后一行,没有下一行")
return None
# 处理下一行,提取所有整数
integers = []
# 分割成单词,尝试转换为整数
words = next_line.strip().split()
for word in words:
# 清理单词,保留数字和负号
cleaned_word = ''.join(filter(lambda c: c.isdigit() or c == '-', word))
if cleaned_word: # 确保清理后不为空
try:
num = int(cleaned_word)
integers.append(num)
except ValueError:
continue
# 检查是否有足够的整数
if len(integers) >= n:
return integers[n-1] # 因为列表是0索引所以n-1
else:
print(f"下一行中只找到 {len(integers)} 个整数,无法返回第 {n}")
return None
# 如果遍历完文件都没找到目标字符串
print(f"文件中未找到包含 '{target_string}' 的行")
return None
except FileNotFoundError:
print(f"错误:文件 '{file_path}' 不存在")
return None
except Exception as e:
print(f"处理文件时发生错误:{str(e)}")
return None
def extract_first_second_level_timestamp(text):
"""
提取字符串中第一个不带时区的秒级时间戳格式YYYY-MM-DDTHH:MM:SS
并返回标准ISO格式字符串
参数:
text (str): 包含时间戳的原始字符串
返回:
str: 提取到的ISO格式时间戳未找到则返回None
"""
# 正则表达式匹配不带时区的秒级时间戳YYYY-MM-DDTHH:MM:SS
# 严格匹配日期和时间的数字范围如月份1-12日期1-31等
timestamp_pattern = (
r'\b\d{4}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01])' # 日期部分 YYYY-MM-DD
r'T([01]\d|2[0-3]):([0-5]\d):([0-5]\d)\b' # 时间部分 THH:MM:SS
)
# 查找第一个匹配的时间戳
match = re.search(timestamp_pattern, text)
if match:
timestamp_str = match.group()
try:
# 解析为datetime对象不带时区
dt = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S")
# 返回ISO格式秒级
return dt.isoformat()
except ValueError:
# 理论上正则已过滤无效格式,此处作为兜底
return timestamp_str
# 未找到匹配的时间戳
return None
def is_full_word_present(text, word):
"""
判断text中是否包含完整的word全词匹配大小写敏感
参数:
text (str): 待检查的字符串
word (str): 要匹配的完整单词
返回:
bool: 存在全词匹配返回True否则返回False
"""
# 使用正则表达式元字符定义单词边界,确保全词匹配
# re.escape()用于转义word中的特殊字符
pattern = r'\b' + re.escape(word) + r'\b'
# 搜索匹配(大小写敏感)
match = re.search(pattern, text)
return bool(match)
def parse_idllog_line(log_line):
"""
解析特定格式的日志行提取关键信息
参数:
log_line (str): 要解析的日志行字符串
返回:
dict: 包含提取的信息的字典若解析失败则返回None
"""
try:
# 处理开头的 <162> 部分,先移除这部分内容
# 找到第一个空格,跳过 <162> 部分
first_space_index = log_line.find(' ')
if first_space_index == -1:
return None
content_after_prefix = log_line[first_space_index:].strip()
# 从剩余内容中提取第一个时间戳(到下一个空格)
timestamp_end = content_after_prefix.find(' ')
if timestamp_end == -1:
return None
timestamp = content_after_prefix[:timestamp_end].strip()
# 查找包含|分隔符的部分
pipe_start = log_line.find('|')
if pipe_start == -1:
return None
pipe_content = log_line[pipe_start:]
# 按|分割内容
parts = [part.strip() for part in pipe_content.split('|') if part.strip()]
# 检查是否有足够的部分
if len(parts) < 5:
return None
# 提取各部分信息
component_type = parts[1] # 部件类型
event_type = parts[2] # 事件类型
event_level = parts[3] # 事件等级
event_code = parts[4] # 事件代码
# 事件描述是剩余部分的组合
event_description = '|'.join(parts[5:]) if len(parts) > 5 else ""
# 提取第一个单词(以空格为分隔符)
sensor = event_description.split()[0]
return {
'timestamp': timestamp,
'component_type': component_type,
'event_type': event_type,
'event_level': event_level,
'event_code': event_code,
'sensor' : sensor,
'description': event_description
}
except Exception as e:
print(f"解析日志行时出错: {str(e)}")
return None