""" 向本地 InfluxDB 写入测试数据 url: http://127.0.0.1:8086 bucket: PCM measurement: PCM_Measurement token: 用户提供 org: 这里假设也是 PCM,如果你的组织名不是 PCM,请改成真实 org 名字 """ from influxdb_client import InfluxDBClient, Point, WriteOptions import random import datetime import time URL = "http://127.0.0.1:8086" TOKEN = "gDWhyQlxdW3VN3VZIY_D13yA0ZUp7hV7QeXrxRbWqVNfakL73Ltz18lZ9rLS0lp1UcD_5t2u6_Io9YrdSiVODA==" ORG = "MEASCON" # 如果你的 org 不是 PCM,这里要改 BUCKET = "PCM" MEASUREMENT = "PCM_Measurement" DATA_TYPE = "LSDAQ" # 对应你图里的 data_type 列 # 一些示例字段名,按你截图里大致的含义写的 NUMERIC_FIELDS = [ "减速箱小轴承1", "十字头#1", "环境温度", "减速箱小轴承2", "减速箱小轴承4", "减速箱大轴承#3", "主轴承#3", "主轴承#4", ] WARNING_FIELDS = [ "主轴承#3.warning", "主轴承#4.warning", "主轴承#5.warning", "减速箱小轴承1.warning", "减速箱小轴承2.warning", ] def build_point() -> Point: """构造一条 PCM_Measurement 测试数据""" now = datetime.datetime.utcnow() p = ( Point(MEASUREMENT) .tag("data_type", DATA_TYPE) .time(now) ) # 数值类字段:写入随机浮点 for f in NUMERIC_FIELDS: p = p.field(f, round(random.uniform(-50.0, 300.0), 3)) # 告警字段:写入 0/1 for f in WARNING_FIELDS: p = p.field(f, random.randint(0, 1)) return p def main_once(): """只写入一批数据""" with InfluxDBClient(url=URL, token=TOKEN, org=ORG) as client: write_api = client.write_api(write_options=WriteOptions(batch_size=1)) point = build_point() write_api.write(bucket=BUCKET, record=point) print("wrote one point to bucket=PCM, measurement=PCM_Measurement") def main_loop(interval_sec: float = 1.0): """每 interval_sec 秒写入一批数据,方便做实时测试""" with InfluxDBClient(url=URL, token=TOKEN, org=ORG) as client: write_api = client.write_api(write_options=WriteOptions(batch_size=1)) while True: point = build_point() write_api.write(bucket=BUCKET, record=point) print("wrote point at", datetime.datetime.now().isoformat()) time.sleep(interval_sec) if __name__ == "__main__": # 只写一组就用 main_once() main_once() # 如果想持续写入做实时测试,改成: # main_loop(1.0)