TG-PlatformPlus/logs copy.py

319 lines
13 KiB
Python
Raw Normal View History

2026-03-02 14:29:58 +08:00
import os, time, csv, struct, sys, shutil
import numpy as np
import xlwings as xw
import pandas as pd
from datetime import datetime
sys.path.append(r'.\UserScripts')
import CRC
from win32com.client import Dispatch
import pythoncom, report
# 将bytearray以字为单位转换成16进制字符串方便显示
def wordData2HexStr(data):
ret = ' '.join(data[i:i+2].hex() for i in range(0, len(data), 2))
return ret.upper()
# 获取当前时间字符串
def nowStr():
now = datetime.now()
ret = now.strftime('%H:%M:%S.') + f"{now.microsecond // 1000:03d}"
return ret
def checkValue(data: bytes) -> int:
crc = 0xFFFF
length = len(data)
if length % 2 != 0:
return 0
for i in range(0, length, 2):
val = data[i] * 256 + data[i + 1]
crc = crc ^ val
return crc
def sendCmd(hexStr):
attr = cmdInfo['attr']
try:
data = bytearray().fromhex(hexStr) #将十六进制字符串转成字节数据发送
if attr['req_is_check'] == True:
crc = CRC.crc8(data)
data = data + bytearray(crc.to_bytes(1, 'big'))
send(data)
except Exception as e:
log_e(f"Error in sendCmd. {str(e)}")
# 启动函数,命令开始会调用此函数
def start():
global startTime, recvData, runMode, parseStr, timeDelta, dataPoints, sdimDatas, deviceInfo, keys, workMode
global modeName, avePointsNum, deviceInfo, fo, RECORDSNUM, recordNum, fileName, SN,istoxlsx
try:
log_i(f"[{devInfo['name']}] {cmdInfo['name']}".center(80, '-'))
attr = cmdInfo['attr']
if not attr.__contains__('params') or not attr['params']:
attr['params'] = scanf('输入参数', '请输入记录条数, 平均点数, 是否新建文件(1-是, 0-否),是否导出报告(1-是, 0-否)', '1, 10, 1,0')
# attr['params'] = '0.005 100'
if not attr['params']:
raise ValueError(f"Cancel.")
params = attr['params']
values = params.replace(' ','').split(',')
RECORDSNUM = int(values[0])
recordNum = 0
isCreateFile = bool(int(values[2]))
istoxlsx = bool(int(values[3]))
# log(str(isCreateFile))
cmdInfo['attr']['rsp_timeout'] = 400
if RECORDSNUM <= 0:
raise ValueError(f"Wrong params.")
avePointsNum = int(values[1])
modeName = {'0x0001':'Calibration','0x0002':'Verification', '0x0003':'CalibI', '0x0004':'CalibII'}
deviceInfo = _G.get(devInfo['name'])
if not deviceInfo:
deviceInfo = {'data':{}}
# if 'workMode' in deviceInfo['data']:
# workMode = deviceInfo['data']['workMode']
# if workMode not in modeName:
# raise ValueError(f"Please set tool to right mode.")
# else:
workMode = '0x0003'
SN = _G.get('SN')
if not SN:
log(f"Unable to read SN number,Set the default SN number to 123.")
SN = 123
keysList = {'0x0001':['GX_AD','GX_T','GY_AD','GY_T','GZ_AD','GZ_T','Pt','MX_AD','MY_AD','MZ_AD','M_T','INC','GTF','GT','BRD_T','AZI','MT'],
'0x0002':['GX','GY','GZ','SINC','DIP','SAZI','MX','MY','MZ','MTF','RPM','INC','GTF','GT','MODE','AZI','MT'],
'0x0003':['GX_AD','GX_T','GY_AD','GY_T','GZ_AD','GZ_T','Pt','MX_AD','MY_AD','MZ_AD','M_T','INC','GTF','BRD_T','GT','MT','AZI'],
'0x0004':['GX_AD','GX_T','GY_AD','GY_T','GZ_AD','GZ_T','Pt','MX_AD','MY_AD','MZ_AD','M_T','INC','GTF','CURR_AD','GT','MT','AZI']}
keys = keysList[workMode]
#新建数据文件
if not proInfo['path']:
raise ValueError(f"Please create project first.")
if not os.path.exists(f"{proInfo['path']}/T150_SDIM/"):
os.makedirs(f"{proInfo['path']}/T150_SDIM/")
#log(proInfo['path'])
#判断模板文件是否存在
if not os.path.exists(f"{os.path.dirname(os.path.dirname(proInfo['path']))}/Testcases/T150_SDIM/SDIM.FT.ReportTemplate.xlsx"):
raise ValueError(f"Dosen't exist {os.path.dirname(os.path.dirname(proInfo['path']))}/Testcases/T150_SDIM/SDIM.FT.ReportTemplate.xlsx")
TC_SDIM_Calib = _G.get('TC_SDIM_Calib')
if not TC_SDIM_Calib:
TC_SDIM_Calib = {'dataFile':f"{proInfo['path']}/T150_SDIM/{SN}静态标定_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"}
_G.set('TC_SDIM_Calib', TC_SDIM_Calib)
# _G.set('TcFlieCSV', TC_SDIM_Calib)
fileName = TC_SDIM_Calib['dataFile']
if os.path.exists(fileName):
if isCreateFile:
fileName = f"{proInfo['path']}/T150_SDIM/{SN}静态标定_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"
TC_SDIM_Calib['dataFile'] = fileName
_G.set('TC_SDIM_Calib', TC_SDIM_Calib)
_G.set('TcFlieCSV', TC_SDIM_Calib)
fo = open(fileName, 'w+', encoding='utf-8', newline='')
fo.write(','.join(keys)+'\n')
fo.flush()
else:
fo = open(fileName, 'a', encoding='utf-8', newline='')
else:
fo = open(fileName, 'w+', encoding='utf-8', newline='')
fo.write(','.join(keys)+'\n')
fo.flush()
deviceInfo['listenStatus'] = 1
_G.set(devInfo['name'], deviceInfo)
timeDelta = None
dataPoints = []
sdimDatas = []
startTime = time.time() #记录启动时间
recvData = bytearray()
log(f"[{nowStr()}]: Start to listen response of T150-SDIM Board....")
return True
except Exception as e:
finish()
log_e(f"Error in loop(): {str(e)}")
# 接收数据处理函数,当收到数据会调用此函数
def recvDataHandler(data):
global recvData
recvData = recvData + data
if len(recvData)>60:
log(f"{wordData2HexStr(recvData)}")
recvData = bytearray()
def dataParse():
global recvData, rsp_len, remainingRetryTimes, deviceData, workMode, avePointsNum, deviceInfo, fo, dataPoints, sdimDatas
global RECORDSNUM, recordNum, fileName, SN,istoxlsx
try:
temp = bytearray()
for i in range(18):
temp.extend(recvData[i*3+4:i*3+7])
temp.append(0x00)
rawData = list(struct.unpack('<18I', temp))
sdimData = []
if workMode == '0x0001':
for i in range(17):
sdimData.append(rawData[0+i])
elif workMode == '0x0002':
for i in range(3):
sdimData.append(rawData[0+i]/10000.0-2)
sdimData.append(rawData[3]/100.0)
sdimData.append(rawData[4]/100.0-90)
sdimData.append(rawData[5]/100.0)
for i in range(3):
sdimData.append(rawData[6+i]/10000.0-2)
sdimData.append(rawData[9]/100.0)
sdimData.append(rawData[10]/1.0)
sdimData.append(rawData[11]/100.0)
sdimData.append(rawData[12]/100.0)
sdimData.append(rawData[13]/10000.0)
sdimData.append(rawData[14])
sdimData.append(rawData[15]/100.0)
sdimData.append(rawData[16]/10000.0)
elif workMode == '0x0003' or workMode == '0x0004':
log_w(f"{rawData}")
for i in range(17):
sdimData.append(rawData[i])
measData = dict(zip(keys, sdimData))
#log_e(f"measData{measData}")
dataPoints.append({'measurement':f"SDIM.{modeName[workMode]}", 'time':datetime.utcfromtimestamp(time.time()), 'fields': measData})
if len(dataPoints) >= avePointsNum:
#写入influxDB
tsdb.write_points(dataPoints)
dataPoints = []
#写入csv文件
temp = np.average(np.array(sdimDatas), axis=0)
temp_int = [int(num) for num in temp] #对平均后的数据取整
dataPoint = dict(zip(keys, temp_int)) #组合
deviceInfo['data']['measData'] = dataPoint
_G.set(devInfo['name'], deviceInfo)
log(f"AverageData{dataPoint}", level = 'INFO', color = '#228B22')
fo.write(','.join([str(round(x,0)) for x in temp])+'\n')
recordNum += 1
if recordNum >= RECORDSNUM:
finish()
#根据模板文件和数据文件,新建报告文件
fo.close()
if istoxlsx == 1:
shutil.copy(f"{os.path.dirname(os.path.dirname(proInfo['path']))}/Testcases/T150_SDIM/SDIM.FT.ReportTemplate.xlsx", f"{fileName[:-4]}.xlsx")
copyDataFromCSVToXlsx(fileName, f"{fileName[:-4]}.xlsx",SN)
log(f"[{nowStr()}]: Stop to listen response of T150-SDIM Board!")
sdimDatas = []
else:
sdimDatas.append(sdimData)
except Exception as e:
log_e(f"Error in dataParse. {str(e)}")
finish()
# 此函数会被重复调用间隔10毫秒直到finish()
def loop():
global startTime, runMode, recvData, parseStr, timeDelta, dataPoints, deviceInfo, keys, workMode, modeName, avePointsNum
global RECORDSNUM, recordNum
try:
if ((time.time() > startTime + int(cmdInfo['attr']['rsp_timeout'])) and runMode == 0):
log_e(f"[{nowStr()}]: Timeout error. Stop to listen response of T150-SDIM Board!")
fo.close()
finish()
else:
if len(recvData) >= 60:
if recvData[0:4] == bytearray().fromhex('90 EB 0C EA'):
log_d(f"[{nowStr()}]: {wordData2HexStr(recvData[0:60])}")
dataParse()
recvData = recvData[60:]
else:
recvData = recvData[1:]
except Exception as e:
finish()
log_e(f"Error in loop(): {str(e)}")
def copyDataFromCSVToXlsx(csvFile, xlsxFile, SN):
try:
pythoncom.CoInitialize()
xl = xw._xlwindows.COMRetryObjectWrapper(Dispatch("ket.Application"))
impl = xw._xlwindows.App(visible = False , add_book = False , xl = xl)
app = xw.App(visible = False , add_book = False , impl = impl)
workbook = app.books.open(xlsxFile)
tempJsonData = csvFileToDict(csvFile)
log(str(tempJsonData))
fillData('', tempJsonData, workbook)
#sheet = workbook.sheets['Sheet1']
#sheet.name = f"{SN}#"
workbook.save(xlsxFile)
workbook.close()
app.kill()
except Exception as e:
log_e(f"Error in copyDataFromCSVToXlsx. {str(e)}")
finish()
def csvFileToDict(csvFilePath):
info = ''
try:
if not os.path.exists(csvFilePath):
raise Exception(f"{csvFilePath} file doesn't exist.")
tempTitle = []
tempData = []
dfData = pd.DataFrame({})
with open(csvFilePath, "r", encoding='utf-8') as f:
reader = csv.reader(f)
lines = [line for line in reader]
# reversed_lines = lines[::-1]
for line in lines:
if line[0].startswith('ASM.GX.AD'):
if len(tempData) > 0:
tempDataFrame = pd.DataFrame(dict(zip(tempTitle, list(map(list, zip(*tempData))))))
dfData = pd.concat([dfData, tempDataFrame])
# tempDataFrame.to_csv(f"{csvFilePath}.{tempData[0][1]}.csv")
tempData = []
tempTitle = line
else:
tempData.append(line)
tempDataFrame = pd.DataFrame(dict(zip(tempTitle, list(map(list, zip(*tempData))))))
dfData = pd.concat([dfData, tempDataFrame])
# dfData = dfData.replace(np.nan, '0')
dictData = dfData.to_dict(orient='list')
return dictData
except Exception as e:
raise Exception(f"Error in csvFileToDict(): {str(e)}")
def fillData(baseName, data, workbook):
try:
info = ''
if isinstance(data, dict):
for key, value in data.items():
if (not isinstance(value, dict)) or (not value):
log(f"{{{baseName}.{key}}}".replace('{.', '{'))
info += f"{{{baseName}.{key}}}"
for worksheet in workbook.sheets:
result = worksheet.used_range.api.find(f"{{{baseName}.{key.replace('{.','{')}}}".replace('{.', '{'))
if result:
if isinstance(value, list):
address = f"{result.address}".split('$')
worksheet.range(f"{result.address}:${address[1]}${int(address[2])+len(value)-1}").options(transpose=True).value = value
else:
worksheet.used_range.api.Replace(f"{{{baseName}.{key}}}", f"{value}")
else:
baseName += f".{key}"
fillData(baseName, value, workbook)
baseName = baseName[0:baseName.rfind('.')]
else:
info += f"{data}"
raise Exception(f"Error in fillData(): Type of data is not dict.")
return info
except Exception as e:
raise Exception(f"Error in fillData(): {str(e)}")