152 lines
4.6 KiB
Python
152 lines
4.6 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
调试时间戳问题
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import datetime
|
||
sys.path.append(os.path.dirname(__file__))
|
||
|
||
from influxdb_client import InfluxDBClient, Point
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
from config_model import AppConfig
|
||
from pathlib import Path
|
||
|
||
def test_timestamp_writing():
|
||
"""测试不同时间戳的写入"""
|
||
print("🕐 测试时间戳写入")
|
||
|
||
# 加载配置
|
||
config_path = Path("default.json")
|
||
config = AppConfig.load(config_path)
|
||
|
||
influx_config = config.influx
|
||
|
||
client = InfluxDBClient(
|
||
url=influx_config.url,
|
||
token=influx_config.token,
|
||
org=influx_config.org
|
||
)
|
||
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
bucket = getattr(influx_config, 'bucket', 'PCM')
|
||
measurement = getattr(influx_config, 'measurement', 'experiment_status')
|
||
|
||
# 测试不同的时间戳格式
|
||
now_local = datetime.datetime.now()
|
||
now_utc = datetime.datetime.now(datetime.timezone.utc)
|
||
|
||
print(f"本地时间: {now_local}")
|
||
print(f"UTC时间: {now_utc}")
|
||
|
||
try:
|
||
# 1. 写入本地时间戳
|
||
print("\n1️⃣ 写入本地时间戳...")
|
||
point1 = Point(measurement) \
|
||
.field("status", "test_local") \
|
||
.time(now_local)
|
||
|
||
write_api.write(bucket=bucket, record=point1)
|
||
print("✅ 本地时间戳写入成功")
|
||
|
||
# 2. 写入UTC时间戳
|
||
print("\n2️⃣ 写入UTC时间戳...")
|
||
point2 = Point(measurement) \
|
||
.field("status", "test_utc") \
|
||
.time(now_utc)
|
||
|
||
write_api.write(bucket=bucket, record=point2)
|
||
print("✅ UTC时间戳写入成功")
|
||
|
||
# 3. 写入当前时间(无时区)
|
||
print("\n3️⃣ 写入当前时间(模拟quick_test_data.py)...")
|
||
point3 = Point(measurement) \
|
||
.field("status", "test_current") \
|
||
.time(datetime.datetime.now())
|
||
|
||
write_api.write(bucket=bucket, record=point3)
|
||
print("✅ 当前时间写入成功")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 写入失败: {e}")
|
||
return
|
||
|
||
client.close()
|
||
|
||
# 等待一下让数据落盘
|
||
import time
|
||
print("\n⏳ 等待2秒让数据落盘...")
|
||
time.sleep(2)
|
||
|
||
# 查询刚写入的数据
|
||
print("\n🔍 查询刚写入的数据...")
|
||
|
||
from influx_service import InfluxService, InfluxConnectionParams
|
||
|
||
params = InfluxConnectionParams(
|
||
url=influx_config.url,
|
||
org=influx_config.org,
|
||
token=influx_config.token
|
||
)
|
||
|
||
service = InfluxService(params)
|
||
|
||
try:
|
||
# 查询最近5分钟的数据
|
||
df = service.query(
|
||
bucket=bucket,
|
||
measurement=measurement,
|
||
fields=['status'],
|
||
filters={},
|
||
time_range="-5m"
|
||
)
|
||
|
||
if df.empty:
|
||
print("❌ 没有找到刚写入的数据")
|
||
|
||
# 尝试更大的时间范围
|
||
print("🔍 尝试查询最近1小时的数据...")
|
||
df_hour = service.query(
|
||
bucket=bucket,
|
||
measurement=measurement,
|
||
fields=['status'],
|
||
filters={},
|
||
time_range="-1h"
|
||
)
|
||
|
||
if df_hour.empty:
|
||
print("❌ 1小时内也没有数据")
|
||
else:
|
||
print(f"✅ 1小时内找到 {len(df_hour)} 条数据")
|
||
# 显示最新的几条
|
||
if '_time' in df_hour.columns and '_value' in df_hour.columns:
|
||
df_sorted = df_hour.sort_values('_time').tail(5)
|
||
for _, row in df_sorted.iterrows():
|
||
time_local = row['_time'].astimezone()
|
||
time_str = time_local.strftime('%H:%M:%S')
|
||
value = row['_value']
|
||
print(f" {time_str}: {value}")
|
||
else:
|
||
print(f"✅ 找到 {len(df)} 条刚写入的数据")
|
||
|
||
# 显示数据
|
||
if '_time' in df.columns and '_value' in df.columns:
|
||
df_sorted = df.sort_values('_time')
|
||
for _, row in df_sorted.iterrows():
|
||
time_local = row['_time'].astimezone()
|
||
time_str = time_local.strftime('%H:%M:%S')
|
||
value = row['_value']
|
||
print(f" {time_str}: {value}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 查询失败: {e}")
|
||
|
||
def main():
|
||
print("时间戳问题调试工具")
|
||
print("=" * 30)
|
||
test_timestamp_writing()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|