onekeydiag/src/ZiJin_parse_event.py
leimingsheng 63d36548aa code sync
2025-08-22 21:19:53 +08:00

257 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

''' 模块说明
1.目标json格式, json记录中以数组形式记录
{
"id": 1,
"timestamp": "2023-01-15T08:30:00",
"title": "系统启动",
"details": {
Desc: "AC"
}
}
2.需要记录入的事件类型
Title keywords Detail
--------------- 常规 ---------------
BMC启动 | BMC_Boot | { Cause }
系统重启 | SYS_Boot | { Cause }
电源状态Down | S5/G2 |
电源状态Up | S0/G0 |
固件更新 | FW UPDATE | { Compoent , version_change }
--------------- 故障 ---------------
高温事件 | {温度传感器} | { SensorName , level, IDL Desc }
电压事件 | {电压传感器} | { SensorName , level, IDL Desc }
故障事件 | {其它Assert事件} | { SensorName , level, IDL_Desc }
'''
import os
import utils
import json
import re
project_root = utils.get_project_root()
cache_dir = os.path.join(project_root, "okd_tmp")
log_dir = os.path.join(cache_dir, "onekeylog")
idl_dir = os.path.join(log_dir, "log")
idl_file = os.path.join(idl_dir, "idl.log")
cache_idl = os.path.join(cache_dir, "merge_idl.log")
eventline_json = os.path.join(cache_dir, "eventline.json")
TempSensorSpec = [ "Inlet_CPU_Temp", "Outlet_CPU_Temp", "M_2_Temp_on_MB", "Inlet_FPGA_Temp",
"Outlet_FPGA_Temp", "FPGA_Temp", "FPGA_VR_Temp", "NIC_OPT0_Temp", "NIC_OPT1_Temp",
"NIC_OPT2_Temp", "NIC_OPT3_Temp", "CPU_Temp", "DIMM_Temp", "MEM_VR_Temp",
"CPU_VR_Temp"]
VoltSensorSpec = [ "A_PVCCANA", "A_P1V8_STBY", "A_PVNN_PCH", "CPU_VCORE", "A_PVTT", "A_P2V5_AUX",
"SYS_3V3", "A_P1V2_BMC_DDR4", "A_P1V15_AUX", "SYS_5V", "A_P1V05", "CPU_DDR_VDDQ0",
"P3V_BAT", "VCCH_GXE_1V1", "VCCL_0V8", "VCCIO_PIO_1V2", "VPP_2V5", "P3V3",
"VCCH_0V9", "VCCL_SDM_0V8", "VCCPT_1V8", "SYS_12V", "P12V_B"]
CommonKey = [ "BMC_Boot", "SYS_Boot", "S5/G2", "S0/G0", "FW UPDATE"]
ErrorKey = [ "Critical", "Warning" ]
g_json_record_id = 0
def init_json_file(file_path=eventline_json):
"""
初始化一个JSON文件内容为空列表
参数:
file_path (str): JSON文件的路径
返回:
bool: 初始化成功返回True失败返回False
"""
try:
# 检查文件是否已存在
if os.path.exists(file_path):
print(f"文件 {file_path} 已存在,无需初始化")
return True
# 创建并写入空列表
with open(file_path, 'w', encoding='utf-8') as f:
json.dump([], f, ensure_ascii=False, indent=2)
print(f"JSON文件 {file_path} 初始化成功")
return True
except Exception as e:
print(f"初始化JSON文件失败: {str(e)}")
return False
def add_data_to_json(data, file_path=eventline_json):
"""
向JSON文件中添加一组数据追加到列表中
参数:
file_path (str): JSON文件的路径
data (any): 要添加的数据(可以是字典、列表、字符串等可序列化类型)
返回:
bool: 添加成功返回True失败返回False
"""
try:
# 检查文件是否存在,不存在则先初始化
if not os.path.exists(file_path):
if not init_json_file(file_path):
return False
# 读取现有数据
with open(file_path, 'r', encoding='utf-8') as f:
try:
content = json.load(f)
# 确保内容是列表类型
if not isinstance(content, list):
print(f"JSON文件内容不是列表无法添加数据")
return False
except json.JSONDecodeError:
print(f"JSON文件格式错误无法读取")
return False
# 添加新数据
content.append(data)
# 写回文件
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(content, f, ensure_ascii=False, indent=2)
# print(f"数据已成功添加到 {file_path}")
return True
except Exception as e:
print(f"添加数据到JSON文件失败: {str(e)}")
return False
def parse_common_event_to_json(log_str, keys):
timestamp = utils.extract_first_second_level_timestamp(log_str)
idl_dict = utils.parse_idllog_line(log_str)
details_info = {}
match keys:
case "BMC_Boot":
title = "BMC启动"
cause = idl_dict['description']
details_info['原因'] = cause
case "SYS_Boot":
title = "系统重启"
cause = idl_dict['description']
details_info['原因'] = cause
case "S5/G2":
title = "电源状态Down"
case "S0/G0":
title = "电源状态Up"
case "FW UPDATE":
title = "固件更新"
cause = idl_dict['description']
details_info['版本变更'] = cause
case _:
print("No Valid Key!")
return
global g_json_record_id
json_record_id = g_json_record_id
g_json_record_id = g_json_record_id + 1
json_record = {}
json_record["id"] = json_record_id
json_record["timestamp"] = timestamp
json_record["title"] = title
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
def parse_error_event_to_json(log_str, level):
# 检查是否是 Deassert 事件
if utils.is_full_word_present(log_str, "Deassert"):
return
global g_json_record_id
json_record_id = g_json_record_id
g_json_record_id = g_json_record_id + 1
json_record = {}
details_info = {}
timestamp = utils.extract_first_second_level_timestamp(log_str)
log_dict = utils.parse_idllog_line(log_str)
json_record["id"] = json_record_id
json_record["timestamp"] = timestamp
details_info["Level"] = level
details_info["IDL_Desc"] = log_dict['description']
details_info["Sensor"] = log_dict['sensor']
# 判定是否为温度事件
for key in TempSensorSpec:
if key in log_str:
json_record["title"]=f"温度事件-{key}"
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
return
# 判定是否为电压事件
for key in VoltSensorSpec:
if key in log_str:
json_record["title"]=f"电压事件-{key}"
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
return
# 故障事件判定
json_record["title"] = f"故障事件-{log_dict['sensor']}"
json_record["details"] = details_info
json_str = json.dumps(json_record, indent=2, ensure_ascii=False)
add_data_to_json(json_str)
return
def check_line_with_keywords(log_str):
# 检查标准事件
for key in CommonKey:
if key in log_str:
parse_common_event_to_json(log_str, key)
return
# 检查故障事件
for key in ErrorKey:
if key in log_str:
parse_error_event_to_json(log_str, key)
return
def scan_event_from_idl():
global g_json_record_id
g_json_record_id = 1
init_json_file()
try:
with open(cache_idl, 'r', encoding='utf-8') as file:
for line_num, line_content in enumerate(file, 1): # 行号从1开始
# 检查当前行是否包含任何关键词
check_line_with_keywords(line_content)
# print(f"Line: {line_num}, has event")
except FileNotFoundError:
raise FileNotFoundError(f"文件不存在: {cache_idl}")
except Exception as e:
raise Exception(f"扫描文件时发生错误: {str(e)}")
def get_event_json():
try:
# 读取并解析JSON文件
with open(eventline_json, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# 更新表格数据
# print(json_data)
return json_data
except json.JSONDecodeError:
print("错误", "JSON格式错误, 无法解析文件")
except Exception as e:
print( "错误", f"加载文件失败: {str(e)}")
def program_main():
# parse idl已完成了 idl 的合并任务, 这里直接读取即可
scan_event_from_idl()
return True
if __name__ == "__main__":
# scan_event_from_idl()
json_data = get_event_json()