TG-PlatformPlus/grafana.py

231 lines
8.0 KiB
Python

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import os, subprocess, psutil
import threading
# from grafana_api.grafana_face import GrafanaFace
import json
from logs import log
import requests
class Garfana():
def __init__(self):
self.cur_bucket = 'default'
self.reset()
def setConfig(self, conf):
self.username = conf['username']
self.password = conf['password']
self.host = conf['host']
self.port = conf['port']
def reset(self):
self.grafana_api = None
self.grafanaServerName = 'grafana-server.exe'
self.grafanaServerDir = os.path.join(os.path.dirname(__file__), 'grafana','bin')
self.grafanaServer = os.path.join(self.grafanaServerDir, self.grafanaServerName)
def checkGarfana(self, name):
for p in psutil.process_iter(attrs=['name']):
if p.name() == name:
return p.pid
return None
def wait_for_process(self, process):
stdout, stderr = process.communicate()
# 处理进程的输出
def openService(self):
try:
if self.checkGarfana(self.grafanaServerName):
return
if not os.path.exists(self.grafanaServer):
return
process = subprocess.Popen(
self.grafanaServer,
cwd=self.grafanaServerDir,
creationflags=subprocess.CREATE_NO_WINDOW,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# 创建线程来等待进程完成
thread = threading.Thread(target=self.wait_for_process, args=(process,))
thread.start()
except Exception as e:
log.error(e)
def openClient(self):
try:
pass
# self.grafana_api =GrafanaFace(
# auth=(self.username,self.password),
# host=self.host,
# port=self.port,
# # verify=False,
# # protocol='http',
# # path_prefix='',
# # scheme='http',
# # name='grafana',
# # base_url='http://127.0.0.1:3000',
# # basic_auth_user=self.username,
# # basic_auth_password=self.password,
# # headers={'Authorization': token, 'content-type': "application/json"},
# # proxies=None,
# timeout=120,
# )
except Exception as e:
log.error(e)
def setBucket(self, bucket):
self.cur_bucket = bucket
def closeService(self):
try:
pid = self.checkGarfana('gpx_sqlite-datasource_windows_amd64.exe')
if pid:
psutil.Process(pid).terminate()
pid = self.checkGarfana(self.grafanaServerName)
if pid:
psutil.Process(pid).terminate()
except Exception as e:
log.error(e)
def closeClient(self):
try:
if not self.grafana_api:
return
self.grafana_api = None
return
except Exception as e:
log.error(e)
def open(self):
try:
self.reset()
if self.host == '127.0.0.1' or self.host == 'localhost':
self.openService()
self.openClient()
except Exception as e:
log.error(e)
self.reset()
def close(self):
try:
if self.host == '127.0.0.1' or self.host == 'localhost':
self.closeService()
self.closeClient()
except Exception as e:
log.error(e)
self.reset()
# def write(self, measurement, field = None, value = 0, unit = None):
# try:
# if not field:
# return False
# if not self.grafana_api or not self.write_api or not self.grafana_api.ping():
# self.open()
# if unit:
# _field = f'{field} {unit}'
# else:
# _field = field
# point = Point(measurement).field(_field, value)
# self.write_api.write(bucket=self.bucketName, org=self.org, record=point)
# return True
# except Exception as error:
# log.error(error)
# return False
# def create(self, _bucketName):
# try:
# if not self.grafana_api or not self.buckets_api or not self.grafana_api.ping():
# self.open()
# self.setBucket(_bucketName)
# buckets = self.buckets_api.find_buckets().buckets
# for bucket in buckets:
# if bucket.name == _bucketName:
# return False, _bucketName
# self.buckets_api.create_bucket(bucket_name=_bucketName, org=self.org)
# return True, _bucketName
# except Exception as error:
# log.error(error)
# return False, None
def getUid(self, base_url, token, name):
try:
headers = {'Authorization': token, 'content-type': "application/json"}
api_url = f'{base_url}/api/search?query={name}'
response = requests.get(api_url, headers=headers, verify=False, timeout=120)
if response.status_code == 200:
result = response.json()
# result = [{'id': 2, 'uid': 'jMZkKuiSk', 'title': 'dataanalysis', 'uri': 'db/dataanalysis', 'url': '/d/jMZkKuiSk/dataanalysis', 'slug': '', 'type': 'dash-db', 'tags': [], 'isStarred': False, 'sortMeta': 0}]
if result:
return True, result
return False, None
except Exception as e:
log.error(e)
return False, None
def updateTime(self, uid, base_url, token , starttime, stoptime):
try:
api_url = f'{base_url}/api/dashboards/uid/{uid}'
headers = {'Authorization': token, 'content-type': "application/json"}
response = requests.get(api_url, headers=headers, verify=False, timeout=120)
if response.status_code == 200:
result = response.json()
if result:
api_url = f'{base_url}/api/dashboards/db/'
result['dashboard']['time']['from'] = starttime
result['dashboard']['time']['to'] = stoptime
requests.post(api_url, data=json.dumps(result), headers=headers)
return True, result
return False, None
except Exception as e:
log.error(e)
return False, None
def update(self, uid, base_url, token):
try:
api_url = f'{base_url}/api/dashboards/uid/{uid}'
headers = {'Authorization': token, 'content-type': "application/json"}
response = requests.get(api_url, headers=headers, verify=False, timeout=120)
if response.status_code == 200:
result = response.json()
if result:
api_url = f'{base_url}/api/dashboards/db/'
if len(result['dashboard']['panels']) > 0:
for panel in result['dashboard']['panels']:
for target in panel['targets']:
query = target['query']
start = query.find('from(bucket: \"') + len('from(bucket: \"')
end = query.find('\"', start)
# 替换找到的内容
new_query = query[:start] + self.cur_bucket + query[end:]
target['query'] = new_query
requests.post(api_url, data=json.dumps(result), headers=headers)
return True, result
return False, None
except Exception as e:
log.error(e)
return False, None
def run(self):
try:
self.open()
except Exception as e:
log.error( e)
self.reset()
grafana = Garfana()