TG-PlatformPlus/UserScripts/faps.py

1010 lines
46 KiB
Python
Raw Permalink Normal View History

2026-03-02 14:29:58 +08:00
import os, time, csv, struct, sys
import numpy as np
sys.path.append(r'.\UserScripts')
from user_common import wordData2HexStr, nowStr, nowStr1, checkValue, params_to_int
from CRC import crc16 as checkValue
SENDHEADER = ''
RSPHEADER = ''
RETRYTIMES = 3
CYCLES = 1
TIMEOUT = 0.05
DEBUG = False
# 启动函数,命令开始会调用此函数
def start():
global recvData, deviceData
try:
log_i(f"[{devInfo['name']}] {cmdInfo['name']}".center(80, '-'))
deviceData = _G.get(f"{devInfo['name']}")
if not deviceData:
deviceData = {}
if 'data' not in deviceData:
deviceData['data'] = {}
if 'info' not in deviceData:
deviceData['info'] = {}
if 'status' not in deviceData:
deviceData['status'] = {}
if 'config' not in deviceData:
raise Exception(f"Please load config first.")
deviceData['status']['StopAll'] = False
_G.set(devInfo['name'], deviceData)
except Exception as e:
log_e(f"Error in start. {str(e)}")
finish()
# 此函数会被重复调用间隔10毫秒直到finish()
def loop():
global recvData, deviceData
try:
LRSOV_Open()
LRSOV_Close()
LRSOV_Open()
LRSOV_Close()
finish()
except Exception as e:
log_e(f"Error in loop. {str(e)}")
finish()
# 接收数据处理函数,当收到数据会调用此函数
def recvDataHandler(data):
global recvData
try:
recvData = recvData + data
except Exception as e:
log_e(f"Error in recvDataHandler. {str(e)}")
finish()
def log_du(str):
if DEBUG:
log_d(str)
def exeCmd(cmd):
try:
global recvData
data = bytearray().fromhex(cmd[0])
data += bytearray(checkValue(data).to_bytes(2, 'little'))
for i in range(RETRYTIMES):
log_du(f"[{nowStr()}] Sent:{wordData2HexStr(data)}")
recvData = bytearray()
send(data)
time.sleep(TIMEOUT)
rspLen = int(cmd[1])
if len(recvData) >= rspLen:
log_du(f"[{nowStr()}] Echo:{wordData2HexStr(recvData[0:rspLen])}")
crc = int.from_bytes(recvData[rspLen-2:rspLen], byteorder='little')
calc_value = checkValue(recvData[0:rspLen-2])
# log(f"{crc:04X}, {calc_value:04X}")
if crc == calc_value:
return [True, recvData[0:rspLen]]
return [False, None]
except Exception as e:
raise Exception(f"Error in exeCmd({cmd}): {str(e)}")
def LRSOV_Enable():
try:
log(f"LRSOV_Enable starts.")
ret = exeCmd(['0306 0080 0001', 8])
if ret[0]:
log(f"LRSOV_Enable succeeded.")
return True
else:
log_e(f"LRSOV_Enable failed.")
return False
except Exception as e:
log_e(f"Error in LRSOV_Enable(): str{e}")
return False
def LRSOV_Disable():
try:
log(f"LRSOV_Disable starts.")
ret = exeCmd(['0306 0080 0000', 8])
if ret[0]:
log(f"LRSOV_Disable succeeded.")
return True
else:
log_e(f"LRSOV_Disable failed.")
return False
except Exception as e:
log_e(f"Error in LRSOV_Disable(): str{e}")
return False
def LRSOV_Open():
try:
log(f"LRSOV_Open starts.")
cmdList = [
['0306 0080 0001', 8], #0使能电机
['0303 00A4 0004', 13], #1查询位置信息当位置在0点附近退出指令执行
['0306 0084 0001', 8], #2预设打开操作
['0303 00A4 0004', 13], #3查询位置信息当位置在0点附近退出指令执行
['0303 008C 0001', 7] #4查询电机速度当速度为0时禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
Info = ''
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos <= deviceData['config']['LRSOV']['zeroPosition']:
Info = f"It reached the zero position."
optStatus = -1
continue
optStatus = 2
case 2: #2预设打开操作
ret = exeCmd(cmdList[optStatus])
optStatus = 3
case 3: #3查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos <= deviceData['config']['LRSOV']['zeroPosition']:
Info = f"It reached the zero position."
optStatus = -1
continue
case 4: #4查询电机速度当速度为0时禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curSpeed, = struct.unpack('>h', ret[1][3:5])
if curSpeed == 0:
Info = f"Speed is equal to 0."
optStatus = -1
continue
optStatus = 3
case _: #5禁用电机退出函数
ret = exeCmd(['0306 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"LRSOV_Open stopped.")
return True
except Exception as e:
log_e(f"Error in LRSOV_Open(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def LRSOV_Close():
global deviceData
try:
log(f"LRSOV_Close starts.")
cmdList = [
['0306 0080 0001', 8], #0使能电机
['0303 00A4 0004', 13], #1查询位置信息当位置在0点附近退出指令执行
['0306 0084 0000', 8], #2预设关闭操作
['0303 00A4 0004', 13], #3查询位置信息当位置在0点附近退出指令执行
['0303 008C 0001', 7] #4查询电机速度当速度为0时禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
Info = ''
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos >= deviceData['config']['LRSOV']['maxPosition']:
Info = f"It reached the max position."
optStatus = -1
continue
optStatus = 2
case 2: #2预设关闭操作
ret = exeCmd(cmdList[optStatus])
optStatus =3
case 3: #3查询位置信息当位置在0点附近禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos >= deviceData['config']['LRSOV']['maxPosition']:
Info = f"It reached the max position."
optStatus = -1
continue
optStatus = 4
case 4: #4查询电机速度当速度为0时禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curSpeed, = struct.unpack('>h', ret[1][3:5])
if curSpeed == 0:
Info = f"Speed is equal to 0."
optStatus = -1
continue
optStatus = 3
case _: #5禁用电机退出函数
ret = exeCmd(['0306 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"LRSOV_Close stopped.")
return True
except Exception as e:
log_e(f"Error in LRSOV_Close(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def PPSOV_Enable():
try:
log(f"PPSOV_Enable starts.")
ret = exeCmd(['0206 0080 0001', 8])
if ret[0]:
log(f"PPSOV_Enable succeeded.")
return True
else:
log_e(f"PPSOV_Enable failed.")
return False
except Exception as e:
log_e(f"Error in PPSOV_Enable(): str{e}")
return False
def PPSOV_Disable():
try:
log(f"PPSOV_Disable starts.")
ret = exeCmd(['0206 0080 0000', 8])
if ret[0]:
log(f"PPSOV_Disable succeeded.")
return True
else:
log_e(f"PPSOV_Disable failed.")
return False
except Exception as e:
log_e(f"Error in PPSOV_Disable(): str{e}")
return False
def PPSOV_Open():
try:
log(f"PPSOV_Open starts.")
cmdList = [
['0206 0080 0001', 8], #0使能电机
['0203 00A4 0004', 13], #1查询位置信息当位置在0点附近退出指令执行
['0206 0084 0001', 8], #2预设打开操作
['0203 00A4 0004', 13], #3查询位置信息当位置在0点附近退出指令执行
['0203 008C 0001', 7] #4查询电机速度当速度为0时禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
Info = ''
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos <= deviceData['config']['PPSOV']['zeroPosition']:
Info = f"It reached the zero position."
optStatus = -1
continue
optStatus = 2
case 2: #2预设打开操作
ret = exeCmd(cmdList[optStatus])
optStatus = 3
case 3: #3查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos <= deviceData['config']['PPSOV']['zeroPosition']:
Info = f"It reached the zero position."
optStatus = -1
continue
optStatus = 4
case 4: #4查询电机速度当速度为0时禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curSpeed, = struct.unpack('>h', ret[1][3:5])
if curSpeed == 0:
Info = f"Speed is equal to 0."
optStatus = -1
continue
optStatus = 3
case _: #5禁用电机退出函数
ret = exeCmd(['0306 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"PPSOV_Open stopped.")
return True
except Exception as e:
log_e(f"Error in PPSOV_Open(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def PPSOV_Close():
try:
log(f"PPSOV_Close starts.")
cmdList = [
['0206 0080 0001', 8], #0使能电机
['0203 00A4 0004', 13], #1查询位置信息当位置在0点附近退出指令执行
['0206 0084 0000', 8], #2预设关闭操作
['0203 00A4 0004', 13], #3查询位置信息当位置在0点附近退出指令执行
['0203 008C 0001', 7] #4查询电机速度当速度为0时禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
Info = ''
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos >= deviceData['config']['PPSOV']['maxPosition']:
Info = f"It reached the max position."
optStatus = -1
continue
optStatus = 2
case 2: #2预设关闭操作
ret = exeCmd(cmdList[optStatus])
optStatus =3
case 3: #3查询位置信息当位置在0点附近禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos >= deviceData['config']['PPSOV']['maxPosition']:
Info = f"It reached the max position."
optStatus = -1
continue
optStatus = 4
case 4: #4查询电机速度当速度为0时禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curSpeed, = struct.unpack('>h', ret[1][3:5])
if curSpeed == 0:
Info = f"Speed is equal to 0."
optStatus = -1
continue
optStatus = 3
case _: #5禁用电机退出函数
ret = exeCmd(['0306 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"PPSOV_Close stopped.")
return True
except Exception as e:
log_e(f"Error in PPSOV_Close(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def PPECC_Enable():
try:
log(f"PPECC_Enable starts.")
ret = exeCmd(['0106 0080 0001', 8])
if ret[0]:
log(f"PPECC_Enable succeeded.")
return True
else:
log_e(f"PPECC_Enable failed.")
return False
except Exception as e:
log_e(f"Error in PPECC_Enable(): str{e}")
return False
def PPECC_Disable():
try:
log(f"PPECC_Disable starts.")
ret = exeCmd(['0106 0080 0000', 8])
if ret[0]:
log(f"PPECC_Disable succeeded.")
return True
else:
log_e(f"PPECC_Disable failed.")
return False
except Exception as e:
log_e(f"Error in PPECC_Disable(): str{e}")
return False
def PPECC_ReturnToZero():
try:
log(f"PPECC_ReturnToZero starts.")
cmdList = [
['0106 0080 0001', 8], #0使能电机
['0103 00A4 0004', 13], #1查询位置信息当位置在0点附近退出指令执行
['0106 0084 0000', 8], #2预设回零操作
['0106 0081 0000', 8], #3发送2条写寄存器0x81指令产生上升沿启动预设操作
['0106 0081 0001', 8], #4
['0103 00A4 0004', 13], #5查询位置信息当位置在0点附近禁用电机
['0103 008C 0001', 7] #6查询电机速度当速度为0时禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
Info = ''
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if abs(curPos - deviceData['config']['PPECC']['zeroPosition'])<10 or curPos < deviceData['config']['PPECC']['zeroPosition']:
Info = f"It reached the zero position."
optStatus = -1
optStatus = 2
case 2: #2预设回零操作
ret = exeCmd(cmdList[optStatus])
optStatus =3
case 3: #3发送2条写寄存器0x81指令产生上升沿启动预设操作
ret = exeCmd(cmdList[optStatus])
optStatus =4
case 4: #4
ret = exeCmd(cmdList[optStatus])
optStatus =5
case 5: #5查询位置信息当位置在0点附近禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if abs(curPos - deviceData['config']['PPECC']['zeroPosition'])<10 or curPos < deviceData['config']['PPECC']['zeroPosition']:
Info = f"It reached the zero position."
optStatus = -1
continue
optStatus = 6
case 6: #6查询电机速度当速度为0时禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curSpeed, = struct.unpack('>h', ret[1][3:5])
if curSpeed == 0:
Info = f"Speed is equal to 0."
optStatus = -1
continue
optStatus = 5
case _: #-1禁用电机退出函数
ret = exeCmd(['0106 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"PPECC_ReturnToZero stopped.")
return True
except Exception as e:
log_e(f"Error in PPECC_ReturnToZero(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def PPECC_Push(targetPressure):
try:
log(f"PPECC_Push starts.")
cmdList = [
['0106 0080 0001', 8], #0使能电机
['0103 00A4 0004', 13], #1查询位置信息当位置在最大位置附近退出指令执行
['0106 0084 0001', 8], #2预设加压操作
['0106 0081 0000', 8], #3发送2条写寄存器0x81指令产生上升沿启动预设操作
['0106 0081 0001', 8], #4
['0103 00A4 0004', 13], #5查询位置信息当位置在0点附近退出指令执行
['0103 0093 0001', 7], #6查询当前转矩信息当转矩超过阈值退出指令执行
['0106 001A ', 8] #7设置回零速度
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
curTorque = None
Info = ''
deviceData = _G.get(devInfo['name'])
if targetPressure:
targetPressure = int(targetPressure)
if 'pressure' not in deviceData['data'] or deviceData['data']['pressure']['unit'] != 'psi':
raise Exception(f"Can't get 'pressure' information or pressure unit isn't 'psi'.")
if int(deviceData['data']['pressure']['value']) > targetPressure:
Info = f"Current pressure is {deviceData['data']['pressure']['value']}psi."
optStatus = -1
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在最大位置附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if abs(curPos - deviceData['config']['PPECC']['maxPosition'])<10 or curPos > deviceData['config']['PPECC']['maxPosition']:
Info = f"It reached the max position."
optStatus = -1
continue
optStatus = 2
case 2: #2预设加压操作
ret = exeCmd(cmdList[optStatus])
optStatus = 3
case 3: #3发送2条写寄存器0x81指令产生上升沿启动预设操作
ret = exeCmd(cmdList[optStatus])
optStatus = 4
case 4: #4
ret = exeCmd(cmdList[optStatus])
optStatus = 5
case 5: #5查询位置信息当位置在最大位置附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if abs(curPos - deviceData['config']['PPECC']['maxPosition'])<10 or curPos > deviceData['config']['PPECC']['maxPosition']:
Info = f"It reached the max position."
optStatus = -1
continue
optStatus = 6
case 6: #6查询当前转矩信息当转矩超过阈值退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curTorque, = struct.unpack('>H', ret[1][3:5])
if curTorque >= deviceData['config']['PPECC']['maxTorque']:
Info = f"It reached the max torque."
optStatus = -1
continue
optStatus = 7
case 7: #7, 判断是否到达压力
if (deviceData['data']['pressure']['value'] > 0.85*targetPressure):
optStatus = -1
continue
else:
curSpeed = abs(int((deviceData['data']['pressure']['value']-targetPressure))//10)
if curSpeed > 350 or deviceData['data']['pressure']['value']<0:
curSpeed = 350
elif curSpeed < 100:
curSpeed = 100
# if targetPressure>1000:
# if (deviceData['data']['pressure']['value'] > 0.85*targetPressure):
# optStatus = -1
# continue
# else:
# curSpeed = abs(int((deviceData['data']['pressure']['value']-targetPressure))//10)
# if curSpeed > 300:
# curSpeed = 300
# elif targetPressure > 200:
# if (deviceData['data']['pressure']['value'] > 0.85*targetPressure):
# optStatus = -1
# continue
# else:
# curSpeed = 200
# elif targetPressure>100:
# if (deviceData['data']['pressure']['value'] > 0.9*targetPressure):
# optStatus = -1
# continue
# else:
# curSpeed = 200
# else:
# if (deviceData['data']['pressure']['value'] > targetPressure-3):
# optStatus = -1
# continue
# else:
# curSpeed = 200
exeCmd([f"0106 001A {curSpeed:04X}", 8])
optStatus = 5
case _: #-1禁用电机退出函数
ret = exeCmd(['0106 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, curTorque={curTorque}, Info={Info}")
log(f"PPECC_Push stopped.")
return True
except Exception as e:
log_e(f"Error in PPECC_Push(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, curTorque={curTorque}, Info={Info}. {str(e)}")
return False
def BTV_Enable():
try:
log(f"BTV_Enable starts.")
ret = exeCmd(['0406 0080 0001', 8])
if ret[0]:
log(f"BTV_Enable succeeded.")
return True
else:
log_e(f"BTV_Enable failed.")
return False
except Exception as e:
log_e(f"Error in BTV_Enable(): str{e}")
return False
def BTV_Disable():
try:
log(f"BTV_Disable starts.")
ret = exeCmd(['0406 0080 0000', 8])
if ret[0]:
log(f"BTV_Disable succeeded.")
return True
else:
log_e(f"BTV_Disable failed.")
return False
except Exception as e:
log_e(f"Error in BTV_Disable(): str{e}")
return False
def BTV_IsAtZero():
try:
log(f"BTV_IsAtZero starts.")
cmdList = [
['0403 00A4 0004', 13, 5] #查询位置信息当位置在0点附近退出指令执行
]
optStatus = 0
retryTimes = cmdList[optStatus][2]
while(retryTimes):
retryTimes -= 1
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if (curPos - deviceData['config']['BTV']['zeroPosition'])<= 10:
log(f"BTV_IsAtZero stopped. It reached zero position.")
return True
log_w(f"No response.")
log(f"BTV_IsAtZero stopped.")
return False
except Exception as e:
log_e(f"Error in BTV_IsAtZero(): str{e}")
return False
def BTV_ReturnToZero():
try:
log(f"BTV_ReturnToZero starts.")
cmdList = [
['0406 0080 0001', 8], #0使能电机
['0403 00A4 0004', 13], #1查询位置信息当位置在0点附近退出指令执行
['0406 0084 0001', 8], #2预设回零操作
['0403 00A4 0004', 13], #3查询位置信息当位置在0点附近禁用电机
['0406 0080 0000', 8] #4禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
curSpeed = None
Info = ''
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0: #0使能电机
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1: #1查询位置信息当位置在0点附近退出指令执行
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if curPos <= int(deviceData['config']['BTV']['zeroPosition']):
Info = f"It reached the zero position."
optStatus = -1
continue
optStatus = 2
case 2: #2预设回零操作
ret = exeCmd(cmdList[optStatus])
optStatus =3
case 3: #3查询位置信息当位置在0点附近禁用电机
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if (curPos <= int(deviceData['config']['BTV']['zeroPosition'])):
Info = f"It reached the zero position."
optStatus = -1
continue
case _: #4禁用电机退出函数
ret = exeCmd(['0406 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"BTV_ReturnToZero stopped.")
return True
except Exception as e:
log_e(f"Error in BTV_ReturnToZero(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def BTV_ControlPressure_bk(targetPressure, keepTime, conrtrolMode):
try:
#controlMode取0当到达限位禁用电机controlMode取1当到达限位执行回零、加压操作
global deviceData
log(f"BTV_ControlPressure starts.")
zeroPosition = int(deviceData['config']['BTV']['zeroPosition'])
maxPosition = int(deviceData['config']['BTV']['maxPosition'])
maxSpeed = int(deviceData['config']['BTV']['maxSpeed'])
minSpeed = int(deviceData['config']['BTV']['minSpeed'])
curSpeed = maxSpeed//2
curPressure = int(deviceData['data']['pressure']['value'])
curDirection = 1 # 1:正向,-1:反向
limitedError = int(deviceData['config']['BTV']['limitedError'])
cmdList = [
['0406 0080 0001', 8], #0 使能电机
[f"0410 0061 0002 04 {maxPosition:08X}", 8], #1 设定电机目标位置
['0406 0084 0000', 8], #2 预设加压操作
['0406 0081 0000', 8], #3 发送2条写寄存器0x81指令产生上升沿启动预设操作
['0406 0081 0001', 8], #4
['0406 0063', 8], #5 根据当前压力和目标压力的差值,调整电机速度
['0403 00A4 0004', 13], #6 查询位置信息,当位置到达边界位置,退出指令执行
['0406 0063 0000', 8], #7 设定电机目标速度为0, 停止电机
['0406 0080 0000', 8] #8 禁用电机
]
optStatus = 0
exeFlag = True
startTime = time.time()
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
exeFlag =False
if not exeFlag:
log(f"BTV_ControlPressure stopped.")
return False
match optStatus:
case 0:
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1:
ret = exeCmd(cmdList[optStatus])
optStatus = 2
case 2:
ret = exeCmd(cmdList[optStatus])
optStatus = 3
case 3:
ret = exeCmd(cmdList[optStatus])
optStatus = 4
case 4:
ret = exeCmd(cmdList[optStatus])
optStatus = 5
case 5:
exeCmd(cmdList[0])
exeCmd(cmdList[3])
exeCmd(cmdList[4])
curPressure = int(deviceData['data']['pressure']['value'])
if abs(curPressure-targetPressure) > limitedError:
startTime = time.time()
if time.time()-startTime > keepTime:
log(f"BTV_ControlPressure stopped.")
return True
else:
if targetPressure >= curPressure:
curDirection = 1
else:
curDirection = -1
if abs(curPressure-targetPressure)>1000:
curSpeed = maxSpeed
else:
curSpeed = int(abs(curPressure-targetPressure)/1000*maxSpeed)
if abs(curSpeed) < abs(minSpeed):
curSpeed = minSpeed
curSpeed = curDirection*curSpeed
cmdList[optStatus][0] = (cmdList[optStatus][0][:9] + wordData2HexStr(struct.pack('>h', curSpeed)))
ret = exeCmd( cmdList[optStatus])
optStatus = 6
case 6:
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if (curPos >= maxPosition+1000 and curSpeed >0) or (curPos <= zeroPosition-1000 and curSpeed <0):
log_w(f"Exceed position limit: curPos={curPos}")
if conrtrolMode == 1:
#执行回零加压操作
BTV_ReturnToZero()
if targetPressure-2000>0:
PPECC_Push(targetPressure-2000)
else:
PPECC_Push(0)
optStatus = 0
else:
#禁用电机
optStatus = 8
else:
optStatus = 5
case 7:
ret = exeCmd(cmdList[optStatus])
optStatus = 5
case 8:
ret = exeCmd(cmdList[optStatus])
log(f"BTV_ControlPressure stopped.")
return True
return False
except Exception as e:
log_e(f"Error in BTV_ControlPressure(): str{e}")
return False
def BTV_ControlPressure(targetPressure, keepTime, conrtrolMode):
try:
#controlMode取0当到达限位禁用电机controlMode取1当到达限位执行回零、加压操作
log(f"BTV_ControlPressure starts.")
deviceData = _G.get(devInfo['name'])
deviceData['status']['flagPressureStable'] = False
_G.set(devInfo['name'], deviceData)
zeroPosition = int(deviceData['config']['BTV']['zeroPosition'])
maxPosition = int(deviceData['config']['BTV']['maxPosition'])
maxSpeed = int(deviceData['config']['BTV']['maxSpeed'])
minSpeed = int(deviceData['config']['BTV']['minSpeed'])
curSpeed = maxSpeed//2
curPressure = int(deviceData['data']['pressure']['value'])
curDirection = 1 # 1:正向,-1:反向
limitedError = int(deviceData['config']['BTV']['limitedError'])
cmdList = [
['0406 0080 0001', 8], #0 使能电机
[f"0410 0061 0002 04 {maxPosition:08X}", 8], #1 设定电机目标位置
['0406 0084 0000', 8], #2 预设加压操作
['0406 0081 0000', 8], #3 发送2条写寄存器0x81指令产生上升沿启动预设操作
['0406 0081 0001', 8], #4
['0406 0063', 8], #5 根据当前压力和目标压力的差值,调整电机速度
['0403 00A4 0004', 13], #6 查询位置信息,当位置到达边界位置,退出指令执行
['0406 0063 0000', 8], #7 设定电机目标速度为0, 停止电机
['0406 0080 0000', 8] #8 禁用电机
]
optStatus = 0
exeFlag = True
curPos = None
Info = ''
startTime = time.time()
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0:
ret = exeCmd(cmdList[optStatus])
optStatus = 1
case 1:
ret = exeCmd(cmdList[optStatus])
optStatus = 2
case 2:
ret = exeCmd(cmdList[optStatus])
optStatus = 3
case 3:
ret = exeCmd(cmdList[optStatus])
optStatus = 4
case 4:
ret = exeCmd(cmdList[optStatus])
optStatus = 5
case 5:
exeCmd(cmdList[0])
curPressure = deviceData['data']['pressure']['value']
if abs(curPressure-targetPressure) > limitedError:
startTime = time.time()
deviceData['status']['flagPressureStable'] = False
_G.set(devInfo['name'], deviceData)
if time.time()-startTime > 10:
deviceData['status']['flagPressureStable'] = True
_G.set(devInfo['name'], deviceData)
if time.time()-startTime > keepTime + 10:
optStatus = -1
continue
else:
deviceData['status']['flagPressureStable'] = False
_G.set(devInfo['name'], deviceData)
if targetPressure >= curPressure:
curDirection = 1
else:
curDirection = -1
if abs(curPressure-targetPressure) > 1000:
curSpeed = maxSpeed
elif abs(curPressure-targetPressure) > limitedError/2:
curSpeed = int(abs(curPressure-targetPressure)/1000*maxSpeed)
if abs(curSpeed) < abs(minSpeed):
curSpeed = minSpeed
else:
curSpeed = 0
curSpeed = curDirection*curSpeed
cmdList[optStatus][0] = (cmdList[optStatus][0][:9] + wordData2HexStr(struct.pack('>h', curSpeed)))
ret = exeCmd(cmdList[optStatus])
optStatus = 6
case 6:
ret = exeCmd(cmdList[optStatus])
if ret[0]:
curPos, = struct.unpack('>i', ret[1][7:11])
if (curPos >= maxPosition+1000 and curSpeed >0) or (curPos <= zeroPosition-1000 and curSpeed <0):
log_w(f"Exceed position limit: curPos={curPos}")
startTime = time.time()
deviceData['status']['flagPressureStable'] = False
_G.set(devInfo['name'], deviceData)
BTV_ReturnToZero()
if conrtrolMode == 1:
#执行回零加压操作
PreloadPressure(targetPressure)
optStatus = 0
else:
optStatus = -1
continue
else:
optStatus = 5
case _: #8禁用电机退出函数
deviceData['status']['flagPressureStable'] = False
_G.set(devInfo['name'], deviceData)
ret = exeCmd(['0406 0080 0000', 8])
exeFlag = False
log_du(f"optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
log(f"BTV_ControlPressure stopped.")
return True
except Exception as e:
log_e(f"Error in BTV_ControlPressure(): optStatus={optStatus}, StopAll={deviceData['status']['StopAll']}, curPos={curPos}, curSpeed={curSpeed}, Info={Info}")
return False
def PreloadPressure(targetPressure):
global deviceData
try:
if targetPressure > 8000:
targetPressure = 8000
if BTV_IsAtZero():
PPSOV_Open()
if int(deviceData['data']['pressure']['value']) > targetPressure:
LRSOV_Open()
LRSOV_Close()
else:
PPSOV_Open()
LRSOV_Open()
BTV_ReturnToZero()
LRSOV_Close()
optStatus = 0
exeFlag = True
Info = ''
curSpeed = None
while(exeFlag):
deviceData = _G.get(devInfo['name'])
if deviceData and 'status' in deviceData and 'StopAll' in deviceData['status']:
if deviceData['status']['StopAll']:
optStatus = -1
match optStatus:
case 0:
PPECC_ReturnToZero()
optStatus = 1
case 1:
PPECC_Push(targetPressure)
if targetPressure>1000:
if (deviceData['data']['pressure']['value'] > 0.8*targetPressure):
optStatus = -1
continue
else:
curSpeed = abs(int((deviceData['data']['pressure']['value']-targetPressure))//5)
if curSpeed > 300:
curSpeed = 300
elif targetPressure > 200:
if (deviceData['data']['pressure']['value'] > 0.85*targetPressure):
optStatus = -1
continue
else:
curSpeed = 200
elif targetPressure>100:
if (deviceData['data']['pressure']['value'] > 0.9*targetPressure):
optStatus = -1
continue
else:
curSpeed = 200
else:
if (deviceData['data']['pressure']['value'] > targetPressure-3):
optStatus = -1
continue
else:
curSpeed = 200
optStatus = 0
case _:
PPSOV_Close()
exeFlag = False
log_du(f"Current pressure is {deviceData['data']['pressure']['value']}{deviceData['data']['pressure']['unit']}, optStatus={optStatus}, curSpeed={curSpeed}")
log(f"PreloadPressure stopped. {Info}")
return True
except Exception as e:
log_e(f"Error in PreloadPressure(): {str(e)}.")
return False