#!/usr/bin/env python3 """ 调试时间戳问题 """ import sys import os import datetime sys.path.append(os.path.dirname(__file__)) from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS from config_model import AppConfig from pathlib import Path def test_timestamp_writing(): """测试不同时间戳的写入""" print("🕐 测试时间戳写入") # 加载配置 config_path = Path("default.json") config = AppConfig.load(config_path) influx_config = config.influx client = InfluxDBClient( url=influx_config.url, token=influx_config.token, org=influx_config.org ) write_api = client.write_api(write_options=SYNCHRONOUS) bucket = getattr(influx_config, 'bucket', 'PCM') measurement = getattr(influx_config, 'measurement', 'experiment_status') # 测试不同的时间戳格式 now_local = datetime.datetime.now() now_utc = datetime.datetime.now(datetime.timezone.utc) print(f"本地时间: {now_local}") print(f"UTC时间: {now_utc}") try: # 1. 写入本地时间戳 print("\n1️⃣ 写入本地时间戳...") point1 = Point(measurement) \ .field("status", "test_local") \ .time(now_local) write_api.write(bucket=bucket, record=point1) print("✅ 本地时间戳写入成功") # 2. 写入UTC时间戳 print("\n2️⃣ 写入UTC时间戳...") point2 = Point(measurement) \ .field("status", "test_utc") \ .time(now_utc) write_api.write(bucket=bucket, record=point2) print("✅ UTC时间戳写入成功") # 3. 写入当前时间(无时区) print("\n3️⃣ 写入当前时间(模拟quick_test_data.py)...") point3 = Point(measurement) \ .field("status", "test_current") \ .time(datetime.datetime.now()) write_api.write(bucket=bucket, record=point3) print("✅ 当前时间写入成功") except Exception as e: print(f"❌ 写入失败: {e}") return client.close() # 等待一下让数据落盘 import time print("\n⏳ 等待2秒让数据落盘...") time.sleep(2) # 查询刚写入的数据 print("\n🔍 查询刚写入的数据...") from influx_service import InfluxService, InfluxConnectionParams params = InfluxConnectionParams( url=influx_config.url, org=influx_config.org, token=influx_config.token ) service = InfluxService(params) try: # 查询最近5分钟的数据 df = service.query( bucket=bucket, measurement=measurement, fields=['status'], filters={}, time_range="-5m" ) if df.empty: print("❌ 没有找到刚写入的数据") # 尝试更大的时间范围 print("🔍 尝试查询最近1小时的数据...") df_hour = service.query( bucket=bucket, measurement=measurement, fields=['status'], filters={}, time_range="-1h" ) if df_hour.empty: print("❌ 1小时内也没有数据") else: print(f"✅ 1小时内找到 {len(df_hour)} 条数据") # 显示最新的几条 if '_time' in df_hour.columns and '_value' in df_hour.columns: df_sorted = df_hour.sort_values('_time').tail(5) for _, row in df_sorted.iterrows(): time_local = row['_time'].astimezone() time_str = time_local.strftime('%H:%M:%S') value = row['_value'] print(f" {time_str}: {value}") else: print(f"✅ 找到 {len(df)} 条刚写入的数据") # 显示数据 if '_time' in df.columns and '_value' in df.columns: df_sorted = df.sort_values('_time') for _, row in df_sorted.iterrows(): time_local = row['_time'].astimezone() time_str = time_local.strftime('%H:%M:%S') value = row['_value'] print(f" {time_str}: {value}") except Exception as e: print(f"❌ 查询失败: {e}") def main(): print("时间戳问题调试工具") print("=" * 30) test_timestamp_writing() if __name__ == "__main__": main()