PCM_Viewer/write_test_data.py

91 lines
2.5 KiB
Python
Raw Normal View History

2026-02-06 22:49:52 +08:00
"""
向本地 InfluxDB 写入测试数据
url: http://127.0.0.1:8086
bucket: PCM
measurement: PCM_Measurement
token: 用户提供
org: 这里假设也是 PCM如果你的组织名不是 PCM请改成真实 org 名字
"""
from influxdb_client import InfluxDBClient, Point, WriteOptions
import random
import datetime
import time
URL = "http://127.0.0.1:8086"
TOKEN = "gDWhyQlxdW3VN3VZIY_D13yA0ZUp7hV7QeXrxRbWqVNfakL73Ltz18lZ9rLS0lp1UcD_5t2u6_Io9YrdSiVODA=="
ORG = "MEASCON" # 如果你的 org 不是 PCM这里要改
BUCKET = "PCM"
MEASUREMENT = "PCM_Measurement"
DATA_TYPE = "LSDAQ" # 对应你图里的 data_type 列
# 一些示例字段名,按你截图里大致的含义写的
NUMERIC_FIELDS = [
"减速箱小轴承1",
"十字头#1",
"环境温度",
"减速箱小轴承2",
"减速箱小轴承4",
"减速箱大轴承#3",
"主轴承#3",
"主轴承#4",
]
WARNING_FIELDS = [
"主轴承#3.warning",
"主轴承#4.warning",
"主轴承#5.warning",
"减速箱小轴承1.warning",
"减速箱小轴承2.warning",
]
def build_point() -> Point:
"""构造一条 PCM_Measurement 测试数据"""
now = datetime.datetime.utcnow()
p = (
Point(MEASUREMENT)
.tag("data_type", DATA_TYPE)
.time(now)
)
# 数值类字段:写入随机浮点
for f in NUMERIC_FIELDS:
p = p.field(f, round(random.uniform(-50.0, 300.0), 3))
# 告警字段:写入 0/1
for f in WARNING_FIELDS:
p = p.field(f, random.randint(0, 1))
return p
def main_once():
"""只写入一批数据"""
with InfluxDBClient(url=URL, token=TOKEN, org=ORG) as client:
write_api = client.write_api(write_options=WriteOptions(batch_size=1))
point = build_point()
write_api.write(bucket=BUCKET, record=point)
print("wrote one point to bucket=PCM, measurement=PCM_Measurement")
def main_loop(interval_sec: float = 1.0):
"""每 interval_sec 秒写入一批数据,方便做实时测试"""
with InfluxDBClient(url=URL, token=TOKEN, org=ORG) as client:
write_api = client.write_api(write_options=WriteOptions(batch_size=1))
while True:
point = build_point()
write_api.write(bucket=BUCKET, record=point)
print("wrote point at", datetime.datetime.now().isoformat())
time.sleep(interval_sec)
if __name__ == "__main__":
# 只写一组就用 main_once()
main_once()
# 如果想持续写入做实时测试,改成:
# main_loop(1.0)