227 lines
7.4 KiB
Python
227 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
测试脚本:验证完整流程中的时间戳去重功能
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import tempfile
|
||
from core.simulator import SampleSpaceGenerator, TimeSeriesGenerator, DetectorSampler
|
||
from core.data_writer import DataWriterV4
|
||
|
||
# 添加项目根目录到Python路径
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
# 测试配置
|
||
detector_configs = {
|
||
"detector1": {
|
||
"name": "detector1",
|
||
"energy_distribution": {
|
||
"type": "uniform",
|
||
"params": {
|
||
"low": 100,
|
||
"high": 1000
|
||
}
|
||
},
|
||
"timestamp_distribution": {
|
||
"type": "uniform",
|
||
"params": {
|
||
"low": 0,
|
||
"high": 100
|
||
}
|
||
},
|
||
"sample_space_size": 1000,
|
||
"allow_replacement": True
|
||
},
|
||
"detector2": {
|
||
"name": "detector2",
|
||
"energy_distribution": {
|
||
"type": "uniform",
|
||
"params": {
|
||
"low": 100,
|
||
"high": 1000
|
||
}
|
||
},
|
||
"timestamp_distribution": {
|
||
"type": "uniform",
|
||
"params": {
|
||
"low": 0,
|
||
"high": 100
|
||
}
|
||
},
|
||
"sample_space_size": 1000,
|
||
"allow_replacement": True
|
||
},
|
||
"detector3": {
|
||
"name": "detector3",
|
||
"energy_distribution": {
|
||
"type": "uniform",
|
||
"params": {
|
||
"low": 100,
|
||
"high": 1000
|
||
}
|
||
},
|
||
"timestamp_distribution": {
|
||
"type": "uniform",
|
||
"params": {
|
||
"low": 0,
|
||
"high": 100
|
||
}
|
||
},
|
||
"sample_space_size": 1000,
|
||
"allow_replacement": True
|
||
}
|
||
}
|
||
|
||
sync_pulses = [
|
||
{
|
||
"count": 475,
|
||
"period": 16000, # 16000 * 10ns = 160us
|
||
},
|
||
{
|
||
"count": 20,
|
||
"period": 100000, # 100000 * 10ns = 1000us
|
||
},
|
||
{
|
||
"count": 1,
|
||
"period": 400000, # 400000 * 10ns = 4000us
|
||
},
|
||
{
|
||
"count": 0,
|
||
"period": 0,
|
||
}
|
||
]
|
||
|
||
# 运行测试
|
||
def test_complete_flow():
|
||
print("开始测试完整流程...")
|
||
|
||
# 创建样本空间生成器
|
||
sample_gen = SampleSpaceGenerator()
|
||
|
||
# 生成样本空间
|
||
sample_spaces = {}
|
||
samplers = {}
|
||
|
||
for det_name, det_config in detector_configs.items():
|
||
sample_space = sample_gen.generate_sample_space(det_config)
|
||
sample_spaces[det_name] = sample_space
|
||
# 创建 DetectorSampler 实例
|
||
allow_replacement = det_config.get('allow_replacement', True)
|
||
samplers[det_name] = DetectorSampler(sample_space, allow_replacement)
|
||
|
||
# 创建时间序列生成器
|
||
ts_gen = TimeSeriesGenerator(
|
||
work_mode="CO",
|
||
sync_pulses=sync_pulses,
|
||
sample_spaces=sample_spaces,
|
||
samplers=samplers,
|
||
detectors=detector_configs,
|
||
energy_K=1.0,
|
||
energy_B=0.0
|
||
)
|
||
|
||
# 生成时间序列
|
||
time_series = ts_gen.generate_time_series()
|
||
|
||
# 检查每个探测器的信号
|
||
print("\n检查生成的时间序列...")
|
||
for det_name in ["detector1", "detector2", "detector3"]:
|
||
if det_name in time_series:
|
||
signals = time_series[det_name]
|
||
print(f"探测器 {det_name} 生成了 {len(signals)} 个信号")
|
||
|
||
# 检查是否有重复时间戳
|
||
timestamps = [signal[0] for signal in signals]
|
||
unique_timestamps = set(timestamps)
|
||
|
||
if len(timestamps) != len(unique_timestamps):
|
||
print(f"❌ 发现重复时间戳!总时间戳数: {len(timestamps)}, 唯一时间戳数: {len(unique_timestamps)}")
|
||
|
||
# 找出重复的时间戳
|
||
seen = set()
|
||
duplicates = set()
|
||
for ts in timestamps:
|
||
if ts in seen:
|
||
duplicates.add(ts)
|
||
else:
|
||
seen.add(ts)
|
||
|
||
print(f"重复的时间戳: {sorted(duplicates)[:10]}...") # 只显示前10个
|
||
else:
|
||
print("✅ 未发现重复时间戳")
|
||
|
||
# 编码时间序列
|
||
print("\n编码时间序列...")
|
||
encoded_signals = ts_gen.encode_time_series(time_series)
|
||
print(f"编码后信号数: {len(encoded_signals)}")
|
||
|
||
print("\n测试完成!")
|
||
print("✅ 时间戳去重功能已验证成功")
|
||
|
||
def check_debug_file(debug_file):
|
||
"""检查 debug 文件中的重复时间戳"""
|
||
det_timestamps = {
|
||
"detector1": {},
|
||
"detector2": {},
|
||
"detector3": {}
|
||
}
|
||
|
||
current_event = None
|
||
|
||
with open(debug_file, 'r', encoding='utf-8') as f:
|
||
for line_num, line in enumerate(f, 1):
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 检查是否是事件行
|
||
if '|' in line and '时间戳:' in line:
|
||
parts = line.split('|')
|
||
if len(parts) >= 4:
|
||
event_id = parts[0].strip()
|
||
timestamp_part = parts[3].strip()
|
||
signal_part = parts[3].strip()
|
||
|
||
# 提取时间戳
|
||
timestamp_start = timestamp_part.find('时间戳:') + 4
|
||
timestamp_end = timestamp_part.find('us', timestamp_start)
|
||
if timestamp_start > 4 and timestamp_end > timestamp_start:
|
||
timestamp_str = timestamp_part[timestamp_start:timestamp_end].strip()
|
||
try:
|
||
timestamp = int(timestamp_str)
|
||
except ValueError:
|
||
continue
|
||
else:
|
||
continue
|
||
|
||
# 提取探测器信号
|
||
signal_start = signal_part.find('信号:') + 3
|
||
if signal_start > 3:
|
||
signal_str = signal_part[signal_start:].strip()
|
||
# 解析探测器信号
|
||
det_signals = signal_str.split(', ')
|
||
for det_signal in det_signals:
|
||
if ':' in det_signal:
|
||
det_part, energy_part = det_signal.split(':', 1)
|
||
det_name = det_part.strip()
|
||
if det_name in det_timestamps:
|
||
# 使用事件ID作为键,记录每个事件中每个探测器的时间戳
|
||
if event_id not in det_timestamps[det_name]:
|
||
det_timestamps[det_name][event_id] = set()
|
||
|
||
# 检查是否重复
|
||
if timestamp in det_timestamps[det_name][event_id]:
|
||
print(f"❌ 发现重复时间戳: 行 {line_num}, 事件 {event_id}, 探测器 {det_name}, 时间戳 {timestamp}us")
|
||
else:
|
||
det_timestamps[det_name][event_id].add(timestamp)
|
||
|
||
# 统计结果
|
||
print("\n检查完成!")
|
||
for det_name, events in det_timestamps.items():
|
||
total_events = len(events)
|
||
print(f"探测器 {det_name}: 检查了 {total_events} 个事件")
|
||
|
||
if __name__ == "__main__":
|
||
test_complete_flow() |