PCM_Report/influx_test_tool.py

246 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
InfluxDB测试工具
包含连接测试、权限验证和数据写入功能
"""
import datetime
import json
import os
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
def load_config():
"""从default.json加载配置"""
config_path = os.path.join(os.path.dirname(__file__), "default.json")
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('influx', {})
except Exception as e:
print(f"❌ 加载配置文件失败: {e}")
return {}
def test_connection(config):
"""测试InfluxDB连接"""
print("🔍 测试InfluxDB连接...")
url = config.get('url', '')
org = config.get('org', '')
token = config.get('token', '')
if not all([url, org, token]):
print("❌ 配置不完整:")
print(f" URL: {'' if url else ''}")
print(f" ORG: {'' if org else ''}")
print(f" TOKEN: {'' if token else ''}")
return False
try:
client = InfluxDBClient(url=url, token=token, org=org)
health = client.health()
print(f"✅ 连接成功! 状态: {health.status}")
# 测试组织访问权限
try:
orgs = client.organizations_api().find_organizations()
org_names = [o.name for o in orgs]
print(f"✅ 可访问的组织: {org_names}")
if org not in org_names:
print(f"⚠️ 警告: 配置的组织 '{org}' 不在可访问列表中")
except Exception as e:
print(f"⚠️ 无法获取组织列表: {e}")
# 测试bucket访问权限
bucket = config.get('bucket', 'default')
try:
buckets = client.buckets_api().find_buckets()
bucket_names = [b.name for b in buckets.buckets] if buckets.buckets else []
print(f"✅ 可访问的buckets: {bucket_names}")
if bucket not in bucket_names:
print(f"⚠️ 警告: 配置的bucket '{bucket}' 不存在")
print(f" 建议使用现有bucket或创建新bucket")
except Exception as e:
print(f"⚠️ 无法获取bucket列表: {e}")
client.close()
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
def create_bucket_if_not_exists(config):
"""如果bucket不存在则创建"""
url = config.get('url', '')
org = config.get('org', '')
token = config.get('token', '')
bucket = config.get('bucket', 'test_bucket')
try:
client = InfluxDBClient(url=url, token=token, org=org)
buckets_api = client.buckets_api()
# 检查bucket是否存在
buckets = buckets_api.find_buckets()
bucket_names = [b.name for b in buckets.buckets] if buckets.buckets else []
if bucket not in bucket_names:
print(f"📦 创建bucket: {bucket}")
try:
# 获取组织ID
orgs_api = client.organizations_api()
org_obj = orgs_api.find_organizations(org=org)
if not org_obj:
print(f"❌ 找不到组织: {org}")
return False
org_id = org_obj[0].id
# 创建bucket
from influxdb_client.domain.bucket import Bucket
bucket_obj = Bucket(name=bucket, org_id=org_id)
created_bucket = buckets_api.create_bucket(bucket=bucket_obj)
print(f"✅ Bucket '{bucket}' 创建成功!")
return True
except Exception as e:
print(f"❌ 创建bucket失败: {e}")
print(" 请手动在InfluxDB中创建bucket或使用现有bucket")
return False
else:
print(f"✅ Bucket '{bucket}' 已存在")
return True
except Exception as e:
print(f"❌ 检查bucket失败: {e}")
return False
finally:
if 'client' in locals():
client.close()
def write_test_data(config):
"""写入测试数据"""
url = config.get('url', '')
org = config.get('org', '')
token = config.get('token', '')
bucket = config.get('bucket', 'test_bucket')
measurement = config.get('measurement', 'experiment_status')
print(f"📝 写入测试数据到 {bucket}/{measurement}...")
try:
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
now = datetime.datetime.now()
# 写入状态变化序列
points = [
# 5分钟前: 状态0
Point(measurement)
.field("status", "0")
.field("temperature", 25.0)
.field("pressure", 1.0)
.time(now - datetime.timedelta(minutes=5)),
# 2分钟前: 状态1 (实验开始)
Point(measurement)
.field("status", "1")
.field("temperature", 30.0)
.field("pressure", 1.2)
.time(now - datetime.timedelta(minutes=2)),
# 现在: 状态0 (实验结束)
Point(measurement)
.field("status", "0")
.field("temperature", 28.0)
.field("pressure", 1.1)
.time(now)
]
for i, point in enumerate(points, 1):
write_api.write(bucket=bucket, record=point)
print(f" ✅ 写入数据点 {i}/3")
print("✅ 测试数据写入成功!")
# 验证数据
query_api = client.query_api()
query = f'''
from(bucket: "{bucket}")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "{measurement}")
|> filter(fn: (r) => r._field == "status")
'''
result = query_api.query(query)
data_points = []
for table in result:
for record in table.records:
data_points.append({
'time': record.get_time(),
'value': record.get_value()
})
if data_points:
print(f"✅ 验证成功! 找到 {len(data_points)} 个数据点:")
for point in data_points[-3:]: # 显示最近3个点
print(f" {point['time']}: status = {point['value']}")
else:
print("⚠️ 未找到写入的数据,可能需要等待几秒钟")
client.close()
return True
except Exception as e:
print(f"❌ 写入数据失败: {e}")
return False
def main():
print("InfluxDB测试工具")
print("=" * 50)
# 加载配置
config = load_config()
if not config:
print("❌ 无法加载配置请检查default.json文件")
return
print("📋 当前配置:")
print(f" URL: {config.get('url', 'N/A')}")
print(f" 组织: {config.get('org', 'N/A')}")
print(f" Token: {config.get('token', 'N/A')[:20]}..." if config.get('token') else " Token: N/A")
print(f" Bucket: {config.get('bucket', 'N/A')}")
print(f" Measurement: {config.get('measurement', 'N/A')}")
print()
# 测试连接
if not test_connection(config):
print("\n❌ 连接测试失败请检查配置和InfluxDB服务状态")
return
print()
# 创建bucket如果需要
if not create_bucket_if_not_exists(config):
print("\n⚠️ Bucket检查失败但继续尝试写入数据...")
print()
# 写入测试数据
if write_test_data(config):
print("\n🎉 所有测试完成! InfluxDB配置正常")
print("\n📌 下一步:")
print(" 1. 在程序中点击'开始工单'")
print(" 2. 观察程序是否检测到状态变化")
print(" 3. 查看程序日志输出")
else:
print("\n❌ 测试失败,请检查权限和配置")
if __name__ == "__main__":
main()