231 lines
8.0 KiB
Python
231 lines
8.0 KiB
Python
#!/opt/homebrew/bin/python3
|
|
# -*- coding:utf-8 -*-
|
|
|
|
import os, subprocess, psutil
|
|
import threading
|
|
# from grafana_api.grafana_face import GrafanaFace
|
|
import json
|
|
from logs import log
|
|
import requests
|
|
class Garfana():
|
|
def __init__(self):
|
|
self.cur_bucket = 'default'
|
|
self.reset()
|
|
|
|
def setConfig(self, conf):
|
|
self.username = conf['username']
|
|
self.password = conf['password']
|
|
self.host = conf['host']
|
|
self.port = conf['port']
|
|
|
|
|
|
def reset(self):
|
|
self.grafana_api = None
|
|
self.grafanaServerName = 'grafana-server.exe'
|
|
self.grafanaServerDir = os.path.join(os.path.dirname(__file__), 'grafana','bin')
|
|
self.grafanaServer = os.path.join(self.grafanaServerDir, self.grafanaServerName)
|
|
|
|
def checkGarfana(self, name):
|
|
for p in psutil.process_iter(attrs=['name']):
|
|
if p.name() == name:
|
|
return p.pid
|
|
return None
|
|
|
|
def wait_for_process(self, process):
|
|
stdout, stderr = process.communicate()
|
|
# 处理进程的输出
|
|
|
|
def openService(self):
|
|
try:
|
|
if self.checkGarfana(self.grafanaServerName):
|
|
return
|
|
|
|
if not os.path.exists(self.grafanaServer):
|
|
return
|
|
process = subprocess.Popen(
|
|
self.grafanaServer,
|
|
cwd=self.grafanaServerDir,
|
|
creationflags=subprocess.CREATE_NO_WINDOW,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
# 创建线程来等待进程完成
|
|
thread = threading.Thread(target=self.wait_for_process, args=(process,))
|
|
thread.start()
|
|
except Exception as e:
|
|
log.error(e)
|
|
|
|
def openClient(self):
|
|
try:
|
|
pass
|
|
# self.grafana_api =GrafanaFace(
|
|
# auth=(self.username,self.password),
|
|
# host=self.host,
|
|
# port=self.port,
|
|
# # verify=False,
|
|
|
|
# # protocol='http',
|
|
# # path_prefix='',
|
|
# # scheme='http',
|
|
# # name='grafana',
|
|
# # base_url='http://127.0.0.1:3000',
|
|
# # basic_auth_user=self.username,
|
|
# # basic_auth_password=self.password,
|
|
# # headers={'Authorization': token, 'content-type': "application/json"},
|
|
# # proxies=None,
|
|
# timeout=120,
|
|
# )
|
|
except Exception as e:
|
|
log.error(e)
|
|
|
|
def setBucket(self, bucket):
|
|
self.cur_bucket = bucket
|
|
|
|
def closeService(self):
|
|
try:
|
|
pid = self.checkGarfana('gpx_sqlite-datasource_windows_amd64.exe')
|
|
if pid:
|
|
psutil.Process(pid).terminate()
|
|
|
|
pid = self.checkGarfana(self.grafanaServerName)
|
|
if pid:
|
|
psutil.Process(pid).terminate()
|
|
except Exception as e:
|
|
log.error(e)
|
|
|
|
def closeClient(self):
|
|
try:
|
|
if not self.grafana_api:
|
|
return
|
|
self.grafana_api = None
|
|
return
|
|
except Exception as e:
|
|
log.error(e)
|
|
|
|
def open(self):
|
|
try:
|
|
self.reset()
|
|
if self.host == '127.0.0.1' or self.host == 'localhost':
|
|
self.openService()
|
|
self.openClient()
|
|
except Exception as e:
|
|
log.error(e)
|
|
self.reset()
|
|
|
|
def close(self):
|
|
try:
|
|
if self.host == '127.0.0.1' or self.host == 'localhost':
|
|
self.closeService()
|
|
self.closeClient()
|
|
except Exception as e:
|
|
log.error(e)
|
|
self.reset()
|
|
|
|
|
|
# def write(self, measurement, field = None, value = 0, unit = None):
|
|
# try:
|
|
# if not field:
|
|
# return False
|
|
|
|
# if not self.grafana_api or not self.write_api or not self.grafana_api.ping():
|
|
# self.open()
|
|
|
|
# if unit:
|
|
# _field = f'{field} {unit}'
|
|
# else:
|
|
# _field = field
|
|
|
|
# point = Point(measurement).field(_field, value)
|
|
|
|
# self.write_api.write(bucket=self.bucketName, org=self.org, record=point)
|
|
# return True
|
|
# except Exception as error:
|
|
# log.error(error)
|
|
# return False
|
|
|
|
# def create(self, _bucketName):
|
|
# try:
|
|
# if not self.grafana_api or not self.buckets_api or not self.grafana_api.ping():
|
|
# self.open()
|
|
|
|
# self.setBucket(_bucketName)
|
|
|
|
# buckets = self.buckets_api.find_buckets().buckets
|
|
|
|
# for bucket in buckets:
|
|
# if bucket.name == _bucketName:
|
|
# return False, _bucketName
|
|
|
|
# self.buckets_api.create_bucket(bucket_name=_bucketName, org=self.org)
|
|
|
|
# return True, _bucketName
|
|
# except Exception as error:
|
|
# log.error(error)
|
|
# return False, None
|
|
|
|
|
|
def getUid(self, base_url, token, name):
|
|
try:
|
|
headers = {'Authorization': token, 'content-type': "application/json"}
|
|
api_url = f'{base_url}/api/search?query={name}'
|
|
response = requests.get(api_url, headers=headers, verify=False, timeout=120)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
# result = [{'id': 2, 'uid': 'jMZkKuiSk', 'title': 'dataanalysis', 'uri': 'db/dataanalysis', 'url': '/d/jMZkKuiSk/dataanalysis', 'slug': '', 'type': 'dash-db', 'tags': [], 'isStarred': False, 'sortMeta': 0}]
|
|
if result:
|
|
return True, result
|
|
return False, None
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
def updateTime(self, uid, base_url, token , starttime, stoptime):
|
|
try:
|
|
api_url = f'{base_url}/api/dashboards/uid/{uid}'
|
|
headers = {'Authorization': token, 'content-type': "application/json"}
|
|
response = requests.get(api_url, headers=headers, verify=False, timeout=120)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
if result:
|
|
api_url = f'{base_url}/api/dashboards/db/'
|
|
result['dashboard']['time']['from'] = starttime
|
|
result['dashboard']['time']['to'] = stoptime
|
|
requests.post(api_url, data=json.dumps(result), headers=headers)
|
|
return True, result
|
|
return False, None
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def update(self, uid, base_url, token):
|
|
try:
|
|
api_url = f'{base_url}/api/dashboards/uid/{uid}'
|
|
headers = {'Authorization': token, 'content-type': "application/json"}
|
|
response = requests.get(api_url, headers=headers, verify=False, timeout=120)
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
if result:
|
|
api_url = f'{base_url}/api/dashboards/db/'
|
|
if len(result['dashboard']['panels']) > 0:
|
|
for panel in result['dashboard']['panels']:
|
|
for target in panel['targets']:
|
|
query = target['query']
|
|
start = query.find('from(bucket: \"') + len('from(bucket: \"')
|
|
end = query.find('\"', start)
|
|
# 替换找到的内容
|
|
new_query = query[:start] + self.cur_bucket + query[end:]
|
|
target['query'] = new_query
|
|
requests.post(api_url, data=json.dumps(result), headers=headers)
|
|
return True, result
|
|
return False, None
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def run(self):
|
|
try:
|
|
self.open()
|
|
except Exception as e:
|
|
log.error( e)
|
|
self.reset()
|
|
|
|
grafana = Garfana() |