PCM_Report/debug_timestamp_issue.py

152 lines
4.6 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
#!/usr/bin/env python3
"""
调试时间戳问题
"""
import sys
import os
import datetime
sys.path.append(os.path.dirname(__file__))
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from config_model import AppConfig
from pathlib import Path
def test_timestamp_writing():
"""测试不同时间戳的写入"""
print("🕐 测试时间戳写入")
# 加载配置
config_path = Path("default.json")
config = AppConfig.load(config_path)
influx_config = config.influx
client = InfluxDBClient(
url=influx_config.url,
token=influx_config.token,
org=influx_config.org
)
write_api = client.write_api(write_options=SYNCHRONOUS)
bucket = getattr(influx_config, 'bucket', 'PCM')
measurement = getattr(influx_config, 'measurement', 'experiment_status')
# 测试不同的时间戳格式
now_local = datetime.datetime.now()
now_utc = datetime.datetime.now(datetime.timezone.utc)
print(f"本地时间: {now_local}")
print(f"UTC时间: {now_utc}")
try:
# 1. 写入本地时间戳
print("\n1⃣ 写入本地时间戳...")
point1 = Point(measurement) \
.field("status", "test_local") \
.time(now_local)
write_api.write(bucket=bucket, record=point1)
print("✅ 本地时间戳写入成功")
# 2. 写入UTC时间戳
print("\n2⃣ 写入UTC时间戳...")
point2 = Point(measurement) \
.field("status", "test_utc") \
.time(now_utc)
write_api.write(bucket=bucket, record=point2)
print("✅ UTC时间戳写入成功")
# 3. 写入当前时间(无时区)
print("\n3⃣ 写入当前时间模拟quick_test_data.py...")
point3 = Point(measurement) \
.field("status", "test_current") \
.time(datetime.datetime.now())
write_api.write(bucket=bucket, record=point3)
print("✅ 当前时间写入成功")
except Exception as e:
print(f"❌ 写入失败: {e}")
return
client.close()
# 等待一下让数据落盘
import time
print("\n⏳ 等待2秒让数据落盘...")
time.sleep(2)
# 查询刚写入的数据
print("\n🔍 查询刚写入的数据...")
from influx_service import InfluxService, InfluxConnectionParams
params = InfluxConnectionParams(
url=influx_config.url,
org=influx_config.org,
token=influx_config.token
)
service = InfluxService(params)
try:
# 查询最近5分钟的数据
df = service.query(
bucket=bucket,
measurement=measurement,
fields=['status'],
filters={},
time_range="-5m"
)
if df.empty:
print("❌ 没有找到刚写入的数据")
# 尝试更大的时间范围
print("🔍 尝试查询最近1小时的数据...")
df_hour = service.query(
bucket=bucket,
measurement=measurement,
fields=['status'],
filters={},
time_range="-1h"
)
if df_hour.empty:
print("❌ 1小时内也没有数据")
else:
print(f"✅ 1小时内找到 {len(df_hour)} 条数据")
# 显示最新的几条
if '_time' in df_hour.columns and '_value' in df_hour.columns:
df_sorted = df_hour.sort_values('_time').tail(5)
for _, row in df_sorted.iterrows():
time_local = row['_time'].astimezone()
time_str = time_local.strftime('%H:%M:%S')
value = row['_value']
print(f" {time_str}: {value}")
else:
print(f"✅ 找到 {len(df)} 条刚写入的数据")
# 显示数据
if '_time' in df.columns and '_value' in df.columns:
df_sorted = df.sort_values('_time')
for _, row in df_sorted.iterrows():
time_local = row['_time'].astimezone()
time_str = time_local.strftime('%H:%M:%S')
value = row['_value']
print(f" {time_str}: {value}")
except Exception as e:
print(f"❌ 查询失败: {e}")
def main():
print("时间戳问题调试工具")
print("=" * 30)
test_timestamp_writing()
if __name__ == "__main__":
main()