615 lines
23 KiB
Python
615 lines
23 KiB
Python
#!/opt/homebrew/bin/python3
|
|
# -*- coding:utf-8 -*-
|
|
|
|
from sqlalchemy import Column, Integer, String, create_engine
|
|
from sqlalchemy.orm import sessionmaker, declarative_base
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import os
|
|
import json
|
|
import uuid
|
|
from config import config
|
|
from logs import log
|
|
|
|
dataDB = os.path.join(os.path.dirname(__file__),'data.db')
|
|
engine = create_engine(f'sqlite:///{dataDB}')
|
|
|
|
SessionMaker = sessionmaker(bind=engine)()
|
|
|
|
platform_path1 = config.data["project"]["platform_path1"]
|
|
platform_path2 = config.data["project"]["platform_path2"]
|
|
Base = declarative_base()
|
|
|
|
class User(Base):
|
|
__tablename__ = 'user'
|
|
id = Column(String, primary_key=True, doc='用户ID')
|
|
name = Column(String, nullable=False, doc='用户名')
|
|
password = Column(String, nullable=False, doc='用户密码')
|
|
role_name = Column(String, nullable=True, doc='角色名')
|
|
role = Column(Integer, default=1, doc='角色 0:管理员、1:用户')
|
|
|
|
class Device(Base):
|
|
__tablename__ = 'device'
|
|
id = Column(String, primary_key=True, doc='设备id')
|
|
group_id = Column(String, nullable=False, doc='设备组id')
|
|
dev_model_id = Column(String, nullable=True, doc='设备模型id')
|
|
name = Column(String, nullable=False, doc='设备名称')
|
|
attr = Column(String, nullable=False, doc='设备属性')
|
|
interface = Column(String, nullable=False, doc='设备接口')
|
|
|
|
class DevModel(Base):
|
|
__tablename__ = 'dev_model'
|
|
id = Column(String, primary_key=True, doc='设备模型id')
|
|
group_id = Column(String, nullable=False, doc='设备模型组id')
|
|
name = Column(String, nullable=False, doc='设备模型名称')
|
|
category = Column(String, nullable=False, doc='设备模型类别')
|
|
attr = Column(String, nullable=False, doc='设备模型属性')
|
|
|
|
class Instruction(Base):
|
|
__tablename__ = 'instruction'
|
|
id = Column(String, primary_key=True, doc='指令id')
|
|
dev_model_id = Column(String, nullable=True, doc='设备模型id')
|
|
type = Column(String, nullable=True)
|
|
name = Column(String, nullable=True)
|
|
attr = Column(String, nullable=True)
|
|
|
|
class Project(Base):
|
|
__tablename__ = 'project'
|
|
id = Column(String, primary_key=True, doc='主键id')
|
|
name = Column(String, nullable=True, doc='名称')
|
|
operator = Column(Integer, nullable=False, doc='创建人id')
|
|
operator_name = Column(String, nullable=True, doc='创建人名称')
|
|
date = Column(String, nullable=True, doc='创建日期')
|
|
last_date = Column(String, nullable=True, doc='最后一次打开日期')
|
|
remark = Column(String, nullable=True, doc='备注')
|
|
test_info = Column(String, nullable=True, doc='测试信息')
|
|
history = Column(String, nullable=True, doc='历史数据')
|
|
path = Column(String, nullable=True, doc='路径')
|
|
tsdb_name = Column(String, nullable=True, doc='时序数据库')
|
|
|
|
class TaskInstruction(Base):
|
|
__tablename__ = 'task_instruction'
|
|
id = Column(String, primary_key=True, doc='id')
|
|
task_id = Column(String, nullable=False, doc='任务id')
|
|
device_id = Column(String, nullable=False, doc='设备id')
|
|
loop = Column(String, nullable=True, doc='循环次数')
|
|
delay = Column(String, nullable=True, doc='延时')
|
|
target_type = Column(String, nullable=False, doc='目标类型')
|
|
target_id = Column(String, nullable=False, doc='目标id')
|
|
target_param = Column(Integer, nullable=False, doc='目标id')
|
|
interface_index = Column(Integer, nullable=True, doc='设备接口序号')
|
|
level = Column(Integer, nullable=True, doc='层级')
|
|
|
|
class Task(Base):
|
|
__tablename__ = 'task'
|
|
id = Column(String, primary_key=True, doc='主键id')
|
|
group_id = Column(String, nullable=False, doc='任务组id')
|
|
name = Column(String, nullable=False, doc='名称')
|
|
loop = Column(String, nullable=True, doc='重复次数')
|
|
delay = Column(Integer, nullable=True, doc='间隔时间')
|
|
remark = Column(String, nullable=True, doc='备注')
|
|
script = Column(String, nullable=True, doc='脚本')
|
|
|
|
class TaskGroup(Base):
|
|
__tablename__ = 'task_group'
|
|
id = Column(String, primary_key=True, doc='主键id')
|
|
name = Column(String, nullable=False, doc='名称')
|
|
|
|
class DmGroup(Base):
|
|
__tablename__ = 'device_model_group'
|
|
id = Column(String, primary_key=True, doc='主键id')
|
|
name = Column(String, nullable=False, doc='名称')
|
|
class DevGroup(Base):
|
|
__tablename__ = 'device_group'
|
|
id = Column(String, primary_key=True, doc='主键id')
|
|
name = Column(String, nullable=False, doc='名称')
|
|
|
|
class Session():
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.executor = ThreadPoolExecutor()
|
|
|
|
def checkId(self):
|
|
ok,datas = self.queryByAll(DevModel)
|
|
for data in datas:
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
self.updateById(DevModel, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(Instruction)
|
|
for data in datas:
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
self.updateById(Instruction, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(Device)
|
|
for data in datas:
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
self.updateById(Device, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(TaskGroup)
|
|
for i in range(0,len(datas)):
|
|
data = datas[i]
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
if i == 0:
|
|
data['id'] = '00000000-0000-0000-0000-000000000000'
|
|
self.updateById(TaskGroup, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(DmGroup)
|
|
for i in range(0,len(datas)):
|
|
data = datas[i]
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
if i == 0:
|
|
data['id'] = '00000000-0000-0000-0000-000000000000'
|
|
self.updateById(DmGroup, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(DevGroup)
|
|
for i in range(0,len(datas)):
|
|
data = datas[i]
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
if i == 0:
|
|
data['id'] = '00000000-0000-0000-0000-000000000000'
|
|
self.updateById(DevGroup, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(Task)
|
|
for data in datas:
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
self.updateById(Task, old_id, data)
|
|
|
|
ok,datas = self.queryByAll(TaskInstruction)
|
|
for data in datas:
|
|
old_id = str(data['id'])
|
|
if len(old_id) < 10:
|
|
data['id'] = str(uuid.uuid4())
|
|
self.updateById(TaskInstruction, old_id, data)
|
|
|
|
def modId(self, data):
|
|
newdeviceId = str(uuid.uuid4())
|
|
id = str(data['id'])
|
|
if len(str(id)) < 10:
|
|
data['id'] = newdeviceId
|
|
if '_sa_instance_state' in data:
|
|
del data['_sa_instance_state']
|
|
return id,data
|
|
|
|
|
|
def writeJsonInThread(self, table_name, data):
|
|
# 使用线程池提交任务
|
|
self.executor.submit(self.writeJson, table_name, data)
|
|
|
|
def gitSyncInThread(self):
|
|
# 使用线程池提交任务
|
|
self.executor.submit(self.gitSync)
|
|
|
|
def writeJson(self, table_name, data):
|
|
try:
|
|
data = self.ensure_serializable(data)
|
|
if '_sa_instance_state' in data:
|
|
del data['_sa_instance_state']
|
|
# 拼接完整的文件路径
|
|
file_path = os.path.join(platform_path2, table_name)
|
|
if table_name == 'dev_model' or table_name == 'instruction' or table_name == 'device_model_group':
|
|
file_path = os.path.join(platform_path1, table_name)
|
|
# 检查路径是否存在,如果不存在,则创建路径
|
|
if not os.path.exists(file_path):
|
|
os.makedirs(file_path)
|
|
# 写入JSON文件
|
|
with open(os.path.join(file_path, f'{data["id"]}.json'), 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=4)
|
|
except Exception as e:
|
|
log.error(table_name,data,e)
|
|
|
|
def gitSync(self):
|
|
# 删除self.platform_path目录下的所有文件 除了 .git 目录和.git目录下的所有文件
|
|
for root, dirs, files in os.walk(platform_path1):
|
|
if ".git" not in root:
|
|
for name in files:
|
|
os.remove(os.path.join(root, name))
|
|
self.writeTableJson("dev_model")
|
|
self.writeTableJson("instruction")
|
|
# 删除self.platform_path目录下的所有文件 除了 .git 目录和.git目录下的所有文件
|
|
for root, dirs, files in os.walk(platform_path2):
|
|
if ".git" not in root:
|
|
for name in files:
|
|
os.remove(os.path.join(root, name))
|
|
self.writeTableJson("device")
|
|
self.writeTableJson("task_group")
|
|
self.writeTableJson("device_model_group")
|
|
self.writeTableJson("device_group")
|
|
self.writeTableJson("task")
|
|
self.writeTableJson("task_instruction")
|
|
|
|
def writeTableJson(self, table_name):
|
|
infos = None
|
|
if table_name == "dev_model":
|
|
ok,infos = self.queryByAll(DevModel)
|
|
elif table_name == "instruction":
|
|
ok,infos = self.queryByAll(Instruction)
|
|
elif table_name == "device":
|
|
ok,infos = self.queryByAll(Device)
|
|
elif table_name == "task_group":
|
|
ok,infos = self.queryByAll(TaskGroup)
|
|
elif table_name == "device_model_group":
|
|
ok,infos = self.queryByAll(DmGroup)
|
|
elif table_name == "device_group":
|
|
ok,infos = self.queryByAll(DevGroup)
|
|
elif table_name == "task":
|
|
ok,infos = self.queryByAll(Task)
|
|
elif table_name == "task_instruction":
|
|
ok,infos = self.queryByAll(TaskInstruction)
|
|
if infos:
|
|
for info in infos:
|
|
self.writeJsonInThread(table_name, info)
|
|
|
|
# 确保所有数据都是可序列化的
|
|
def ensure_serializable(self, obj):
|
|
try:
|
|
if 'script' in obj:
|
|
# 删掉b''
|
|
if isinstance(obj['script'], bytes):
|
|
obj['script'] = obj['script'].decode('utf-8')
|
|
#如果存在_sa_instance_state 则删除
|
|
if "attr" in obj:
|
|
obj["attr"] = obj["attr"] and json.loads(obj["attr"]) or {}
|
|
if "interface" in obj:
|
|
obj["interface"] = obj["interface"] and json.loads(obj["interface"]) or []
|
|
# if '_sa_instance_state' in obj:
|
|
# del obj['_sa_instance_state']
|
|
if isinstance(obj, dict):
|
|
return {k: self.ensure_serializable(v) for k, v in obj.items()}
|
|
elif isinstance(obj, list):
|
|
return [self.ensure_serializable(v) for v in obj]
|
|
elif isinstance(obj, bytes):
|
|
return obj.decode('utf-8') # 将bytes转换为字符串
|
|
else:
|
|
return obj
|
|
except Exception as e:
|
|
return obj
|
|
|
|
def removeJson(self, table_name, id):
|
|
try:
|
|
if table_name == 'dev_model' or table_name == 'instruction' or table_name == 'device_model_group':
|
|
#删除json文件
|
|
os.remove(os.path.join(platform_path1, table_name, f'{id}.json'))
|
|
else:
|
|
os.remove(os.path.join(platform_path2, table_name, f'{id}.json'))
|
|
except Exception as e:
|
|
log.error(e)
|
|
|
|
def addByClass(self, cls, _data):
|
|
try:
|
|
if not _data:
|
|
return False, None
|
|
data = cls()
|
|
for k, v in _data.items():
|
|
if k == '_sa_instance_state':
|
|
continue
|
|
setattr(data, k, v)
|
|
SessionMaker.add(data)
|
|
SessionMaker.commit()
|
|
data.id
|
|
self.writeJsonInThread(cls.__tablename__, _data)
|
|
return True, data.__dict__
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
def addByClassFromGit(self, cls, _data):
|
|
try:
|
|
|
|
if not _data:
|
|
return False, None
|
|
data = cls()
|
|
for k, v in _data.items():
|
|
if k == '_sa_instance_state':
|
|
continue
|
|
setattr(data, k, v)
|
|
SessionMaker.add(data)
|
|
SessionMaker.commit()
|
|
data.id
|
|
return True, data.__dict__
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
def queryTaskByTargetType(self, cls, _target_type):
|
|
try:
|
|
if not cls or not _target_type:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.target_type == _target_type).all()
|
|
if not dataAll:
|
|
return False, None
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByTaskId(self, cls, _task_id):
|
|
try:
|
|
if not cls or not _task_id:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.task_id == _task_id).all()
|
|
if not dataAll:
|
|
return False, None
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByGroupId(self, cls, _group_id):
|
|
try:
|
|
if not cls or not _group_id:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.group_id == _group_id).all()
|
|
if not dataAll:
|
|
return False, None
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByTaskGroupId(self, cls, _group_id):
|
|
try:
|
|
if not cls or not _group_id:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.group_id == _group_id).all()
|
|
if not dataAll:
|
|
return False, None
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByDevModelId(self, cls, _dev_model_id):
|
|
try:
|
|
if not cls or not _dev_model_id:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.dev_model_id == _dev_model_id).all()
|
|
if not dataAll:
|
|
return False, None
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByIds(self, cls, ids):
|
|
try:
|
|
if not cls or not ids:
|
|
return False, None
|
|
data = SessionMaker.query(cls).filter(cls.id.in_(ids)).all()
|
|
if not data:
|
|
return False, None
|
|
result = []
|
|
for item in data:
|
|
# if '_sa_instance_state' in item.__dict__:
|
|
# del item.__dict__['_sa_instance_state']
|
|
result.append(item.__dict__)
|
|
return True, result
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryById(self, cls, _id):
|
|
try:
|
|
if not cls or not _id:
|
|
return False, None
|
|
data = SessionMaker.query(cls).get(_id)
|
|
if not data:
|
|
return False, None
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
return True, data.__dict__
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByName(self, cls, _name):
|
|
try:
|
|
if not cls or not _name:
|
|
return False, None
|
|
data = SessionMaker.query(cls).filter_by(name=_name).first()
|
|
if not data:
|
|
return False, None
|
|
data.id
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
return True, data.__dict__
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByTaskId(self, cls, _task_id):
|
|
try:
|
|
if not cls or not _task_id:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.task_id == _task_id).order_by(cls.level)
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def pagedQueryByTaskId(self, cls, _task_id, start_row=0, limit=50):
|
|
try:
|
|
if not cls or not _task_id:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).filter(cls.task_id == _task_id).order_by(cls.level).limit(limit).offset(start_row)
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def queryByAll(self, cls):
|
|
try:
|
|
if not cls:
|
|
return False, None
|
|
dataAll = SessionMaker.query(cls).all()
|
|
if dataAll is None:
|
|
return False, None
|
|
dataList = list()
|
|
for data in dataAll:
|
|
# if '_sa_instance_state' in data.__dict__:
|
|
# del data.__dict__['_sa_instance_state']
|
|
dataList.append(data.__dict__)
|
|
return True, dataList
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
|
|
def updateById(self, cls, _id, _data):
|
|
try:
|
|
|
|
if not cls or not _id or not _data:
|
|
return False, None
|
|
data = SessionMaker.query(cls).filter_by(id=_id).first()
|
|
for k, v in _data.items():
|
|
if k == '_sa_instance_state':
|
|
continue
|
|
if k not in data.__dict__:
|
|
continue
|
|
if data.__dict__[k] == v:
|
|
continue
|
|
setattr(data, k, v)
|
|
SessionMaker.commit()
|
|
if "id" in _data:
|
|
_data = SessionMaker.query(cls).get(_data["id"])
|
|
else:
|
|
_data = SessionMaker.query(cls).get(_id)
|
|
self.writeJsonInThread(cls.__tablename__, _data.__dict__)
|
|
return True, data.__dict__
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
def deleteById(self, cls, _id):
|
|
try:
|
|
if not _id:
|
|
return False, None
|
|
data = SessionMaker.query(cls).filter_by(id=_id).first()
|
|
if not data:
|
|
return False, None
|
|
SessionMaker.delete(data)
|
|
SessionMaker.commit()
|
|
self.removeJson(cls.__tablename__, _id)
|
|
self.gitSyncInThread()
|
|
return True, _id
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
def deleteByIds(self, cls, ids):
|
|
try:
|
|
if not ids:
|
|
return False, None
|
|
data = SessionMaker.query(cls).filter(cls.id.in_(ids)).all()
|
|
if not data:
|
|
return False, None
|
|
for item in data:
|
|
SessionMaker.delete(item)
|
|
self.removeJson(cls.__tablename__, item.id)
|
|
self.gitSyncInThread()
|
|
SessionMaker.commit()
|
|
return True, ids
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
def deleteByName(self, cls, _name):
|
|
try:
|
|
if not _name:
|
|
return False, None
|
|
data = SessionMaker.query(cls).filter_by(name=_name).first()
|
|
if not data:
|
|
return False, None
|
|
SessionMaker.delete(data)
|
|
SessionMaker.commit()
|
|
self.removeJson(cls.__tablename__, data.id)
|
|
self.gitSyncInThread()
|
|
return True, _name
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False, None
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
def deleteAllTable(self):
|
|
try:
|
|
self.delete(DmGroup)
|
|
self.delete(TaskGroup)
|
|
self.delete(Task)
|
|
self.delete(TaskInstruction)
|
|
self.delete(Device)
|
|
self.delete(DevModel)
|
|
self.delete(Instruction)
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False
|
|
|
|
def delete(self, cls):
|
|
try:
|
|
SessionMaker.query(cls).delete()
|
|
SessionMaker.commit()
|
|
return True
|
|
except Exception as e:
|
|
log.error(e)
|
|
return False
|
|
finally:
|
|
SessionMaker.close()
|
|
|
|
Session = Session()
|