246 lines
8.1 KiB
Python
246 lines
8.1 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
InfluxDB测试工具
|
||
包含连接测试、权限验证和数据写入功能
|
||
"""
|
||
|
||
import datetime
|
||
import json
|
||
import os
|
||
from influxdb_client import InfluxDBClient, Point
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
|
||
def load_config():
|
||
"""从default.json加载配置"""
|
||
config_path = os.path.join(os.path.dirname(__file__), "default.json")
|
||
try:
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
config = json.load(f)
|
||
return config.get('influx', {})
|
||
except Exception as e:
|
||
print(f"❌ 加载配置文件失败: {e}")
|
||
return {}
|
||
|
||
def test_connection(config):
|
||
"""测试InfluxDB连接"""
|
||
print("🔍 测试InfluxDB连接...")
|
||
|
||
url = config.get('url', '')
|
||
org = config.get('org', '')
|
||
token = config.get('token', '')
|
||
|
||
if not all([url, org, token]):
|
||
print("❌ 配置不完整:")
|
||
print(f" URL: {'✓' if url else '❌'}")
|
||
print(f" ORG: {'✓' if org else '❌'}")
|
||
print(f" TOKEN: {'✓' if token else '❌'}")
|
||
return False
|
||
|
||
try:
|
||
client = InfluxDBClient(url=url, token=token, org=org)
|
||
health = client.health()
|
||
print(f"✅ 连接成功! 状态: {health.status}")
|
||
|
||
# 测试组织访问权限
|
||
try:
|
||
orgs = client.organizations_api().find_organizations()
|
||
org_names = [o.name for o in orgs]
|
||
print(f"✅ 可访问的组织: {org_names}")
|
||
|
||
if org not in org_names:
|
||
print(f"⚠️ 警告: 配置的组织 '{org}' 不在可访问列表中")
|
||
except Exception as e:
|
||
print(f"⚠️ 无法获取组织列表: {e}")
|
||
|
||
# 测试bucket访问权限
|
||
bucket = config.get('bucket', 'default')
|
||
try:
|
||
buckets = client.buckets_api().find_buckets()
|
||
bucket_names = [b.name for b in buckets.buckets] if buckets.buckets else []
|
||
print(f"✅ 可访问的buckets: {bucket_names}")
|
||
|
||
if bucket not in bucket_names:
|
||
print(f"⚠️ 警告: 配置的bucket '{bucket}' 不存在")
|
||
print(f" 建议使用现有bucket或创建新bucket")
|
||
except Exception as e:
|
||
print(f"⚠️ 无法获取bucket列表: {e}")
|
||
|
||
client.close()
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 连接失败: {e}")
|
||
return False
|
||
|
||
def create_bucket_if_not_exists(config):
|
||
"""如果bucket不存在则创建"""
|
||
url = config.get('url', '')
|
||
org = config.get('org', '')
|
||
token = config.get('token', '')
|
||
bucket = config.get('bucket', 'test_bucket')
|
||
|
||
try:
|
||
client = InfluxDBClient(url=url, token=token, org=org)
|
||
buckets_api = client.buckets_api()
|
||
|
||
# 检查bucket是否存在
|
||
buckets = buckets_api.find_buckets()
|
||
bucket_names = [b.name for b in buckets.buckets] if buckets.buckets else []
|
||
|
||
if bucket not in bucket_names:
|
||
print(f"📦 创建bucket: {bucket}")
|
||
try:
|
||
# 获取组织ID
|
||
orgs_api = client.organizations_api()
|
||
org_obj = orgs_api.find_organizations(org=org)
|
||
if not org_obj:
|
||
print(f"❌ 找不到组织: {org}")
|
||
return False
|
||
|
||
org_id = org_obj[0].id
|
||
|
||
# 创建bucket
|
||
from influxdb_client.domain.bucket import Bucket
|
||
bucket_obj = Bucket(name=bucket, org_id=org_id)
|
||
created_bucket = buckets_api.create_bucket(bucket=bucket_obj)
|
||
print(f"✅ Bucket '{bucket}' 创建成功!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 创建bucket失败: {e}")
|
||
print(" 请手动在InfluxDB中创建bucket或使用现有bucket")
|
||
return False
|
||
else:
|
||
print(f"✅ Bucket '{bucket}' 已存在")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 检查bucket失败: {e}")
|
||
return False
|
||
finally:
|
||
if 'client' in locals():
|
||
client.close()
|
||
|
||
def write_test_data(config):
|
||
"""写入测试数据"""
|
||
url = config.get('url', '')
|
||
org = config.get('org', '')
|
||
token = config.get('token', '')
|
||
bucket = config.get('bucket', 'test_bucket')
|
||
measurement = config.get('measurement', 'experiment_status')
|
||
|
||
print(f"📝 写入测试数据到 {bucket}/{measurement}...")
|
||
|
||
try:
|
||
client = InfluxDBClient(url=url, token=token, org=org)
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
|
||
now = datetime.datetime.now()
|
||
|
||
# 写入状态变化序列
|
||
points = [
|
||
# 5分钟前: 状态0
|
||
Point(measurement)
|
||
.field("status", "0")
|
||
.field("temperature", 25.0)
|
||
.field("pressure", 1.0)
|
||
.time(now - datetime.timedelta(minutes=5)),
|
||
|
||
# 2分钟前: 状态1 (实验开始)
|
||
Point(measurement)
|
||
.field("status", "1")
|
||
.field("temperature", 30.0)
|
||
.field("pressure", 1.2)
|
||
.time(now - datetime.timedelta(minutes=2)),
|
||
|
||
# 现在: 状态0 (实验结束)
|
||
Point(measurement)
|
||
.field("status", "0")
|
||
.field("temperature", 28.0)
|
||
.field("pressure", 1.1)
|
||
.time(now)
|
||
]
|
||
|
||
for i, point in enumerate(points, 1):
|
||
write_api.write(bucket=bucket, record=point)
|
||
print(f" ✅ 写入数据点 {i}/3")
|
||
|
||
print("✅ 测试数据写入成功!")
|
||
|
||
# 验证数据
|
||
query_api = client.query_api()
|
||
query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: -10m)
|
||
|> filter(fn: (r) => r._measurement == "{measurement}")
|
||
|> filter(fn: (r) => r._field == "status")
|
||
'''
|
||
|
||
result = query_api.query(query)
|
||
|
||
data_points = []
|
||
for table in result:
|
||
for record in table.records:
|
||
data_points.append({
|
||
'time': record.get_time(),
|
||
'value': record.get_value()
|
||
})
|
||
|
||
if data_points:
|
||
print(f"✅ 验证成功! 找到 {len(data_points)} 个数据点:")
|
||
for point in data_points[-3:]: # 显示最近3个点
|
||
print(f" {point['time']}: status = {point['value']}")
|
||
else:
|
||
print("⚠️ 未找到写入的数据,可能需要等待几秒钟")
|
||
|
||
client.close()
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"❌ 写入数据失败: {e}")
|
||
return False
|
||
|
||
def main():
|
||
print("InfluxDB测试工具")
|
||
print("=" * 50)
|
||
|
||
# 加载配置
|
||
config = load_config()
|
||
if not config:
|
||
print("❌ 无法加载配置,请检查default.json文件")
|
||
return
|
||
|
||
print("📋 当前配置:")
|
||
print(f" URL: {config.get('url', 'N/A')}")
|
||
print(f" 组织: {config.get('org', 'N/A')}")
|
||
print(f" Token: {config.get('token', 'N/A')[:20]}..." if config.get('token') else " Token: N/A")
|
||
print(f" Bucket: {config.get('bucket', 'N/A')}")
|
||
print(f" Measurement: {config.get('measurement', 'N/A')}")
|
||
print()
|
||
|
||
# 测试连接
|
||
if not test_connection(config):
|
||
print("\n❌ 连接测试失败,请检查配置和InfluxDB服务状态")
|
||
return
|
||
|
||
print()
|
||
|
||
# 创建bucket(如果需要)
|
||
if not create_bucket_if_not_exists(config):
|
||
print("\n⚠️ Bucket检查失败,但继续尝试写入数据...")
|
||
|
||
print()
|
||
|
||
# 写入测试数据
|
||
if write_test_data(config):
|
||
print("\n🎉 所有测试完成! InfluxDB配置正常")
|
||
print("\n📌 下一步:")
|
||
print(" 1. 在程序中点击'开始工单'")
|
||
print(" 2. 观察程序是否检测到状态变化")
|
||
print(" 3. 查看程序日志输出")
|
||
else:
|
||
print("\n❌ 测试失败,请检查权限和配置")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|