264 lines
9.2 KiB
Python
264 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InfluxDB模拟数据生成脚本
|
||
用于生成实验监控所需的测试数据
|
||
"""
|
||
|
||
import datetime
|
||
import time
|
||
import random
|
||
import math
|
||
from influxdb_client import InfluxDBClient, Point
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
# InfluxDB配置
|
||
INFLUX_URL = "http://127.0.0.1:8086"
|
||
INFLUX_ORG = "MEASCON"
|
||
INFLUX_TOKEN = "y7XZbPgRSpmtzgk8Li4NbClK77NIKqNdlM3iE1b7UfNHMsJ6bLiT-dDLasM6dA1RTifcbGuWOdciihXYkNReIA=="
|
||
INFLUX_BUCKET = "test_bucket" # 可以修改为你的bucket名称
|
||
MEASUREMENT = "experiment_status" # 可以修改为你的measurement名称
|
||
|
||
def create_client():
|
||
"""创建InfluxDB客户端"""
|
||
return InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
|
||
|
||
def generate_experiment_cycle_data(client, duration_minutes=30):
|
||
"""
|
||
生成一个完整的实验周期数据
|
||
包含:等待 -> 开始 -> 运行 -> 结束的完整流程
|
||
|
||
Args:
|
||
client: InfluxDB客户端
|
||
duration_minutes: 实验总持续时间(分钟)
|
||
"""
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
|
||
# 计算时间点
|
||
now = datetime.datetime.now()
|
||
start_time = now - datetime.timedelta(minutes=duration_minutes)
|
||
|
||
print(f"生成实验周期数据: {start_time} 到 {now}")
|
||
print(f"实验持续时间: {duration_minutes} 分钟")
|
||
|
||
# 阶段1: 等待状态 (前5分钟)
|
||
wait_duration = min(5, duration_minutes // 6)
|
||
print(f"阶段1: 等待状态 ({wait_duration}分钟)")
|
||
|
||
current_time = start_time
|
||
while current_time < start_time + datetime.timedelta(minutes=wait_duration):
|
||
point = Point(MEASUREMENT) \
|
||
.field("status", 0) \
|
||
.field("temperature", random.uniform(20, 25)) \
|
||
.field("pressure", random.uniform(0.98, 1.02)) \
|
||
.field("speed", 0) \
|
||
.time(current_time)
|
||
|
||
write_api.write(bucket=INFLUX_BUCKET, record=point)
|
||
current_time += datetime.timedelta(seconds=30)
|
||
|
||
# 阶段2: 实验开始 (状态变为1)
|
||
print("阶段2: 实验开始")
|
||
experiment_start = start_time + datetime.timedelta(minutes=wait_duration)
|
||
|
||
point = Point(MEASUREMENT) \
|
||
.field("status", 1) \
|
||
.field("temperature", random.uniform(25, 30)) \
|
||
.field("pressure", random.uniform(1.0, 1.1)) \
|
||
.field("speed", random.uniform(100, 200)) \
|
||
.time(experiment_start)
|
||
|
||
write_api.write(bucket=INFLUX_BUCKET, record=point)
|
||
|
||
# 阶段3: 实验运行 (大部分时间)
|
||
run_duration = duration_minutes - wait_duration - 2 # 减去等待时间和结束时间
|
||
print(f"阶段3: 实验运行 ({run_duration}分钟)")
|
||
|
||
current_time = experiment_start + datetime.timedelta(seconds=30)
|
||
experiment_end_time = start_time + datetime.timedelta(minutes=duration_minutes - 1)
|
||
|
||
while current_time < experiment_end_time:
|
||
# 模拟实验过程中的数据变化
|
||
elapsed_minutes = (current_time - experiment_start).total_seconds() / 60
|
||
|
||
# 温度逐渐升高然后稳定
|
||
base_temp = 30 + min(elapsed_minutes * 2, 20)
|
||
temperature = base_temp + random.uniform(-2, 2)
|
||
|
||
# 压力有周期性波动
|
||
pressure_base = 1.2 + 0.3 * math.sin(elapsed_minutes * 0.1)
|
||
pressure = pressure_base + random.uniform(-0.05, 0.05)
|
||
|
||
# 转速有阶段性变化
|
||
if elapsed_minutes < 10:
|
||
speed_base = 500 + elapsed_minutes * 50
|
||
elif elapsed_minutes < 20:
|
||
speed_base = 1000 + (elapsed_minutes - 10) * 100
|
||
else:
|
||
speed_base = 2000 + random.uniform(-100, 100)
|
||
|
||
speed = max(0, speed_base + random.uniform(-50, 50))
|
||
|
||
point = Point(MEASUREMENT) \
|
||
.field("status", 1) \
|
||
.field("temperature", temperature) \
|
||
.field("pressure", pressure) \
|
||
.field("speed", speed) \
|
||
.time(current_time)
|
||
|
||
write_api.write(bucket=INFLUX_BUCKET, record=point)
|
||
current_time += datetime.timedelta(seconds=30)
|
||
|
||
# 阶段4: 实验结束 (状态变为0)
|
||
print("阶段4: 实验结束")
|
||
|
||
point = Point(MEASUREMENT) \
|
||
.field("status", 0) \
|
||
.field("temperature", random.uniform(40, 45)) \
|
||
.field("pressure", random.uniform(1.0, 1.1)) \
|
||
.field("speed", 0) \
|
||
.time(now)
|
||
|
||
write_api.write(bucket=INFLUX_BUCKET, record=point)
|
||
|
||
print("实验周期数据生成完成!")
|
||
|
||
def generate_continuous_monitoring_data(client, hours=2):
|
||
"""
|
||
生成连续监控数据(无实验状态变化)
|
||
|
||
Args:
|
||
client: InfluxDB客户端
|
||
hours: 生成数据的小时数
|
||
"""
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
|
||
now = datetime.datetime.now()
|
||
start_time = now - datetime.timedelta(hours=hours)
|
||
|
||
print(f"生成连续监控数据: {start_time} 到 {now}")
|
||
|
||
current_time = start_time
|
||
while current_time < now:
|
||
# 模拟设备监控数据
|
||
temperature = 22 + 3 * math.sin((current_time.timestamp() % 3600) / 3600 * 2 * math.pi) + random.uniform(-1, 1)
|
||
pressure = 1.0 + 0.1 * math.cos((current_time.timestamp() % 1800) / 1800 * 2 * math.pi) + random.uniform(-0.02, 0.02)
|
||
|
||
point = Point(MEASUREMENT) \
|
||
.field("status", 0) \
|
||
.field("temperature", temperature) \
|
||
.field("pressure", pressure) \
|
||
.field("speed", 0) \
|
||
.time(current_time)
|
||
|
||
write_api.write(bucket=INFLUX_BUCKET, record=point)
|
||
current_time += datetime.timedelta(minutes=1)
|
||
|
||
print("连续监控数据生成完成!")
|
||
|
||
def generate_multiple_experiments(client, count=3):
|
||
"""
|
||
生成多个实验的数据
|
||
|
||
Args:
|
||
client: InfluxDB客户端
|
||
count: 实验次数
|
||
"""
|
||
print(f"生成 {count} 个实验的数据...")
|
||
|
||
for i in range(count):
|
||
print(f"\n--- 实验 {i+1} ---")
|
||
# 每个实验持续20-40分钟
|
||
duration = random.randint(20, 40)
|
||
|
||
# 实验之间间隔一些时间
|
||
if i > 0:
|
||
time.sleep(1) # 避免时间戳冲突
|
||
|
||
generate_experiment_cycle_data(client, duration)
|
||
|
||
# 实验之间的间隔监控数据
|
||
if i < count - 1:
|
||
generate_continuous_monitoring_data(client, hours=0.5)
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("InfluxDB模拟数据生成器")
|
||
print("=" * 50)
|
||
|
||
try:
|
||
# 创建客户端
|
||
client = create_client()
|
||
|
||
# 测试连接
|
||
print("测试InfluxDB连接...")
|
||
health = client.health()
|
||
print(f"InfluxDB状态: {health.status}")
|
||
|
||
print(f"目标Bucket: {INFLUX_BUCKET}")
|
||
print(f"目标Measurement: {MEASUREMENT}")
|
||
|
||
# 选择生成数据类型
|
||
print("\n请选择要生成的数据类型:")
|
||
print("1. 单个完整实验周期 (30分钟)")
|
||
print("2. 连续监控数据 (2小时)")
|
||
print("3. 多个实验数据 (3个实验)")
|
||
print("4. 实时模拟数据 (持续生成)")
|
||
|
||
choice = input("请输入选择 (1-4): ").strip()
|
||
|
||
if choice == "1":
|
||
generate_experiment_cycle_data(client, 30)
|
||
elif choice == "2":
|
||
generate_continuous_monitoring_data(client, 2)
|
||
elif choice == "3":
|
||
generate_multiple_experiments(client, 3)
|
||
elif choice == "4":
|
||
print("开始实时模拟数据生成 (按Ctrl+C停止)...")
|
||
try:
|
||
while True:
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
|
||
# 随机状态变化
|
||
status = random.choice([0, 0, 0, 1]) # 75%概率为0,25%概率为1
|
||
|
||
# 根据状态生成相应数据
|
||
if status == 0:
|
||
temperature = random.uniform(20, 30)
|
||
pressure = random.uniform(0.95, 1.05)
|
||
speed = 0
|
||
else:
|
||
temperature = random.uniform(30, 60)
|
||
pressure = random.uniform(1.0, 1.5)
|
||
speed = random.uniform(500, 2000)
|
||
|
||
point = Point(MEASUREMENT) \
|
||
.field("status", status) \
|
||
.field("temperature", temperature) \
|
||
.field("pressure", pressure) \
|
||
.field("speed", speed) \
|
||
.time(datetime.datetime.now())
|
||
|
||
write_api.write(bucket=INFLUX_BUCKET, record=point)
|
||
|
||
print(f"写入数据: status={status}, temp={temperature:.1f}, pressure={pressure:.2f}, speed={speed:.0f}")
|
||
time.sleep(5) # 每5秒写入一次
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n实时数据生成已停止")
|
||
else:
|
||
print("无效选择")
|
||
return
|
||
|
||
print("\n数据生成完成!")
|
||
|
||
except Exception as e:
|
||
print(f"错误: {e}")
|
||
print("请检查InfluxDB连接配置和服务状态")
|
||
|
||
finally:
|
||
if 'client' in locals():
|
||
client.close()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|