PCM_Viewer/write_test_data.py

91 lines
2.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
向本地 InfluxDB 写入测试数据
url: http://127.0.0.1:8086
bucket: PCM
measurement: PCM_Measurement
token: 用户提供
org: 这里假设也是 PCM如果你的组织名不是 PCM请改成真实 org 名字
"""
from influxdb_client import InfluxDBClient, Point, WriteOptions
import random
import datetime
import time
URL = "http://127.0.0.1:8086"
TOKEN = "gDWhyQlxdW3VN3VZIY_D13yA0ZUp7hV7QeXrxRbWqVNfakL73Ltz18lZ9rLS0lp1UcD_5t2u6_Io9YrdSiVODA=="
ORG = "MEASCON" # 如果你的 org 不是 PCM这里要改
BUCKET = "PCM"
MEASUREMENT = "PCM_Measurement"
DATA_TYPE = "LSDAQ" # 对应你图里的 data_type 列
# 一些示例字段名,按你截图里大致的含义写的
NUMERIC_FIELDS = [
"减速箱小轴承1",
"十字头#1",
"环境温度",
"减速箱小轴承2",
"减速箱小轴承4",
"减速箱大轴承#3",
"主轴承#3",
"主轴承#4",
]
WARNING_FIELDS = [
"主轴承#3.warning",
"主轴承#4.warning",
"主轴承#5.warning",
"减速箱小轴承1.warning",
"减速箱小轴承2.warning",
]
def build_point() -> Point:
"""构造一条 PCM_Measurement 测试数据"""
now = datetime.datetime.utcnow()
p = (
Point(MEASUREMENT)
.tag("data_type", DATA_TYPE)
.time(now)
)
# 数值类字段:写入随机浮点
for f in NUMERIC_FIELDS:
p = p.field(f, round(random.uniform(-50.0, 300.0), 3))
# 告警字段:写入 0/1
for f in WARNING_FIELDS:
p = p.field(f, random.randint(0, 1))
return p
def main_once():
"""只写入一批数据"""
with InfluxDBClient(url=URL, token=TOKEN, org=ORG) as client:
write_api = client.write_api(write_options=WriteOptions(batch_size=1))
point = build_point()
write_api.write(bucket=BUCKET, record=point)
print("wrote one point to bucket=PCM, measurement=PCM_Measurement")
def main_loop(interval_sec: float = 1.0):
"""每 interval_sec 秒写入一批数据,方便做实时测试"""
with InfluxDBClient(url=URL, token=TOKEN, org=ORG) as client:
write_api = client.write_api(write_options=WriteOptions(batch_size=1))
while True:
point = build_point()
write_api.write(bucket=BUCKET, record=point)
print("wrote point at", datetime.datetime.now().isoformat())
time.sleep(interval_sec)
if __name__ == "__main__":
# 只写一组就用 main_once()
main_once()
# 如果想持续写入做实时测试,改成:
# main_loop(1.0)