PCM_Report/quick_test_data.py

125 lines
3.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
快速测试数据生成脚本
用于快速生成实验状态监控测试数据
"""
import datetime
import time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# 从配置文件读取InfluxDB配置
import json
import os
def load_config():
"""从default.json加载配置"""
config_path = os.path.join(os.path.dirname(__file__), "default.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('influx', {})
except Exception as e:
print(f"加载配置文件失败: {e}")
return {}
# 加载配置
influx_config = load_config()
INFLUX_URL = influx_config.get('url', 'http://127.0.0.1:8086')
INFLUX_ORG = influx_config.get('org', 'MEASCON')
INFLUX_TOKEN = influx_config.get('token', '')
INFLUX_BUCKET = influx_config.get('bucket', 'default')
MEASUREMENT = "PCM_Measurement"
def write_status_change_sequence():
"""写入一个状态变化序列0 -> 1 -> 0"""
try:
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
now = datetime.datetime.now(datetime.timezone.utc)
print("写入状态变化序列...")
# 1. 写入初始状态 0 (5分钟前)
point1 = Point(MEASUREMENT) \
.tag("data_type", "Breaker") \
.field("load_status", 0.0) \
.time(now - datetime.timedelta(minutes=5))
write_api.write(bucket=INFLUX_BUCKET, record=point1)
print("OK 写入load_status 0 (5分钟前)")
# 2. 写入状态变化为 1 (2分钟前)
point2 = Point(MEASUREMENT) \
.tag("data_type", "Breaker") \
.field("load_status", 1.0) \
.time(now - datetime.timedelta(minutes=2))
write_api.write(bucket=INFLUX_BUCKET, record=point2)
print("OK 写入load_status 1 (2分钟前)")
# 3. 写入状态变回 0 (现在)
point3 = Point(MEASUREMENT) \
.tag("data_type", "Breaker") \
.field("load_status", 0.0) \
.time(now)
write_api.write(bucket=INFLUX_BUCKET, record=point3)
print("OK 写入load_status 0 (现在)")
print("\n状态变化序列写入完成!")
print("可以用这个序列测试实验监控功能")
client.close()
except Exception as e:
print(f"错误: {e}")
def write_current_status(status_value):
"""写入当前状态值"""
try:
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
point = Point(MEASUREMENT) \
.tag("data_type", "Breaker") \
.field("load_status", float(status_value)) \
.time(datetime.datetime.now(datetime.timezone.utc))
write_api.write(bucket=INFLUX_BUCKET, record=point)
print(f"OK 写入当前load_status: {status_value}")
client.close()
except Exception as e:
print(f"错误: {e}")
def main():
print("InfluxDB快速测试数据生成器")
print("=" * 40)
print("请选择操作:")
print("1. 写入完整状态变化序列 (0->1->0)")
print("2. 写入状态 0")
print("3. 写入状态 1")
print("4. 查看当前配置")
choice = input("请输入选择 (1-4): ").strip()
if choice == "1":
write_status_change_sequence()
elif choice == "2":
write_current_status("0")
elif choice == "3":
write_current_status("1")
elif choice == "4":
print(f"InfluxDB URL: {INFLUX_URL}")
print(f"组织: {INFLUX_ORG}")
print(f"Bucket: {INFLUX_BUCKET}")
print(f"Measurement: {MEASUREMENT}")
else:
print("无效选择")
if __name__ == "__main__":
main()