RDSS/core/encoder.py

125 lines
4.5 KiB
Python
Raw Permalink Normal View History

2026-03-13 14:16:00 +08:00
#!/usr/bin/env python3
"""
64位整数字段编码器
"""
class BitFieldEncoder:
"""64位整数字段编码器修改版1表示点火0表示未点火"""
def encode(self, timestamp: int, energies: list, event_end: bool = False) -> int:
"""
编码探测器信号为64位整数
修改探测器掩码位 - 1表示点火0表示未点火
位域分配
bit63: 事件结束标志 (1: 结束, 0: 继续)
bit62-45: 18位时间戳 (us)
bit44-42: 3位探测器掩码 (1: 点火, 0: 未点火)
bit41-28: 14位探测器1能量 (keV)
bit27-14: 14位探测器2能量 (keV)
bit13-0: 14位探测器3能量 (keV)
"""
if len(energies) != 3:
energies = energies + [0] * (3 - len(energies))
# 1. 事件结束标志
value = int(event_end) << 63
# 2. 时间戳 (18 bits) - 检查溢出
if timestamp > ((1 << 18) - 1):
raise ValueError(f"时间戳溢出: {timestamp} > {((1 << 18) - 1)}")
timestamp_int = int(timestamp) & ((1 << 18) - 1)
value |= timestamp_int << 45
# 3. 探测器掩码和能量
# 修改默认所有探测器都未点火掩码位为0
detector_mask = 0b000 # 二进制000表示三个探测器都未点火
for i in range(3):
if energies[i] > 0:
# 修改该探测器点火对应掩码位置11表示点火
# 掩码位顺序bit44=det1, bit43=det2, bit42=det3
detector_mask |= (1 << (2 - i)) # 设置对应位
# 检查能量溢出
if energies[i] > ((1 << 14) - 1):
raise ValueError(f"能量溢出: 探测器{i+1}能量{energies[i]} > {((1 << 14) - 1)}")
# 编码能量 (14 bits)
energy_int = int(round(energies[i])) & ((1 << 14) - 1)
if i == 0: # det1
value |= energy_int << 28
elif i == 1: # det2
value |= energy_int << 14
else: # det3
value |= energy_int
# 设置探测器掩码修改1表示点火0表示未点火
value |= detector_mask << 42
return value
def decode(self, encoded_value: int) -> dict:
"""
解码64位整数
修改探测器掩码位 - 1表示点火0表示未点火
"""
result = {}
# 1. 事件结束标志
result['event_end'] = bool((encoded_value >> 63) & 1)
# 2. 时间戳
result['timestamp'] = (encoded_value >> 45) & ((1 << 18) - 1)
# 3. 探测器掩码
detector_mask = (encoded_value >> 42) & 0b111
result['detector_mask'] = detector_mask
# 修改:掩码位解释 - 1表示点火0表示未点火
mask_bits = [((detector_mask >> 2) & 1) == 1, # det1: bit44=1表示点火
((detector_mask >> 1) & 1) == 1, # det2: bit43=1表示点火
(detector_mask & 1) == 1] # det3: bit42=1表示点火
# 4. 各探测器能量
energies = []
# 探测器1能量
energy1 = (encoded_value >> 28) & ((1 << 14) - 1)
energies.append(energy1 if mask_bits[0] else 0.0) # mask_bits[0]=True表示点火
# 探测器2能量
energy2 = (encoded_value >> 14) & ((1 << 14) - 1)
energies.append(energy2 if mask_bits[1] else 0.0) # mask_bits[1]=True表示点火
# 探测器3能量
energy3 = encoded_value & ((1 << 14) - 1)
energies.append(energy3 if mask_bits[2] else 0.0) # mask_bits[2]=True表示点火
result['energies'] = energies
return result
def set_event_end_flag(self, encoded_value: int) -> int:
"""
设置事件结束标志
参数:
encoded_value: 编码后的64位整数
返回:
设置了事件结束标志的64位整数
"""
# 设置bit63为1
return encoded_value | (1 << 63)
def to_binary_string(self, encoded_value: int) -> str:
"""转换为64位二进制字符串"""
return format(encoded_value, '064b')
def to_hex_string(self, encoded_value: int) -> str:
"""转换为16位十六进制字符串"""
return format(encoded_value, '016x')