TG-PlatformPlus/models.py

615 lines
23 KiB
Python

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from concurrent.futures import ThreadPoolExecutor
import os
import json
import uuid
from config import config
from logs import log
dataDB = os.path.join(os.path.dirname(__file__),'data.db')
engine = create_engine(f'sqlite:///{dataDB}')
SessionMaker = sessionmaker(bind=engine)()
platform_path1 = config.data["project"]["platform_path1"]
platform_path2 = config.data["project"]["platform_path2"]
Base = declarative_base()
class User(Base):
__tablename__ = 'user'
id = Column(String, primary_key=True, doc='用户ID')
name = Column(String, nullable=False, doc='用户名')
password = Column(String, nullable=False, doc='用户密码')
role_name = Column(String, nullable=True, doc='角色名')
role = Column(Integer, default=1, doc='角色 0:管理员、1:用户')
class Device(Base):
__tablename__ = 'device'
id = Column(String, primary_key=True, doc='设备id')
group_id = Column(String, nullable=False, doc='设备组id')
dev_model_id = Column(String, nullable=True, doc='设备模型id')
name = Column(String, nullable=False, doc='设备名称')
attr = Column(String, nullable=False, doc='设备属性')
interface = Column(String, nullable=False, doc='设备接口')
class DevModel(Base):
__tablename__ = 'dev_model'
id = Column(String, primary_key=True, doc='设备模型id')
group_id = Column(String, nullable=False, doc='设备模型组id')
name = Column(String, nullable=False, doc='设备模型名称')
category = Column(String, nullable=False, doc='设备模型类别')
attr = Column(String, nullable=False, doc='设备模型属性')
class Instruction(Base):
__tablename__ = 'instruction'
id = Column(String, primary_key=True, doc='指令id')
dev_model_id = Column(String, nullable=True, doc='设备模型id')
type = Column(String, nullable=True)
name = Column(String, nullable=True)
attr = Column(String, nullable=True)
class Project(Base):
__tablename__ = 'project'
id = Column(String, primary_key=True, doc='主键id')
name = Column(String, nullable=True, doc='名称')
operator = Column(Integer, nullable=False, doc='创建人id')
operator_name = Column(String, nullable=True, doc='创建人名称')
date = Column(String, nullable=True, doc='创建日期')
last_date = Column(String, nullable=True, doc='最后一次打开日期')
remark = Column(String, nullable=True, doc='备注')
test_info = Column(String, nullable=True, doc='测试信息')
history = Column(String, nullable=True, doc='历史数据')
path = Column(String, nullable=True, doc='路径')
tsdb_name = Column(String, nullable=True, doc='时序数据库')
class TaskInstruction(Base):
__tablename__ = 'task_instruction'
id = Column(String, primary_key=True, doc='id')
task_id = Column(String, nullable=False, doc='任务id')
device_id = Column(String, nullable=False, doc='设备id')
loop = Column(String, nullable=True, doc='循环次数')
delay = Column(String, nullable=True, doc='延时')
target_type = Column(String, nullable=False, doc='目标类型')
target_id = Column(String, nullable=False, doc='目标id')
target_param = Column(Integer, nullable=False, doc='目标id')
interface_index = Column(Integer, nullable=True, doc='设备接口序号')
level = Column(Integer, nullable=True, doc='层级')
class Task(Base):
__tablename__ = 'task'
id = Column(String, primary_key=True, doc='主键id')
group_id = Column(String, nullable=False, doc='任务组id')
name = Column(String, nullable=False, doc='名称')
loop = Column(String, nullable=True, doc='重复次数')
delay = Column(Integer, nullable=True, doc='间隔时间')
remark = Column(String, nullable=True, doc='备注')
script = Column(String, nullable=True, doc='脚本')
class TaskGroup(Base):
__tablename__ = 'task_group'
id = Column(String, primary_key=True, doc='主键id')
name = Column(String, nullable=False, doc='名称')
class DmGroup(Base):
__tablename__ = 'device_model_group'
id = Column(String, primary_key=True, doc='主键id')
name = Column(String, nullable=False, doc='名称')
class DevGroup(Base):
__tablename__ = 'device_group'
id = Column(String, primary_key=True, doc='主键id')
name = Column(String, nullable=False, doc='名称')
class Session():
def __init__(self):
super().__init__()
self.executor = ThreadPoolExecutor()
def checkId(self):
ok,datas = self.queryByAll(DevModel)
for data in datas:
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
self.updateById(DevModel, old_id, data)
ok,datas = self.queryByAll(Instruction)
for data in datas:
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
self.updateById(Instruction, old_id, data)
ok,datas = self.queryByAll(Device)
for data in datas:
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
self.updateById(Device, old_id, data)
ok,datas = self.queryByAll(TaskGroup)
for i in range(0,len(datas)):
data = datas[i]
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
if i == 0:
data['id'] = '00000000-0000-0000-0000-000000000000'
self.updateById(TaskGroup, old_id, data)
ok,datas = self.queryByAll(DmGroup)
for i in range(0,len(datas)):
data = datas[i]
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
if i == 0:
data['id'] = '00000000-0000-0000-0000-000000000000'
self.updateById(DmGroup, old_id, data)
ok,datas = self.queryByAll(DevGroup)
for i in range(0,len(datas)):
data = datas[i]
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
if i == 0:
data['id'] = '00000000-0000-0000-0000-000000000000'
self.updateById(DevGroup, old_id, data)
ok,datas = self.queryByAll(Task)
for data in datas:
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
self.updateById(Task, old_id, data)
ok,datas = self.queryByAll(TaskInstruction)
for data in datas:
old_id = str(data['id'])
if len(old_id) < 10:
data['id'] = str(uuid.uuid4())
self.updateById(TaskInstruction, old_id, data)
def modId(self, data):
newdeviceId = str(uuid.uuid4())
id = str(data['id'])
if len(str(id)) < 10:
data['id'] = newdeviceId
if '_sa_instance_state' in data:
del data['_sa_instance_state']
return id,data
def writeJsonInThread(self, table_name, data):
# 使用线程池提交任务
self.executor.submit(self.writeJson, table_name, data)
def gitSyncInThread(self):
# 使用线程池提交任务
self.executor.submit(self.gitSync)
def writeJson(self, table_name, data):
try:
data = self.ensure_serializable(data)
if '_sa_instance_state' in data:
del data['_sa_instance_state']
# 拼接完整的文件路径
file_path = os.path.join(platform_path2, table_name)
if table_name == 'dev_model' or table_name == 'instruction' or table_name == 'device_model_group':
file_path = os.path.join(platform_path1, table_name)
# 检查路径是否存在,如果不存在,则创建路径
if not os.path.exists(file_path):
os.makedirs(file_path)
# 写入JSON文件
with open(os.path.join(file_path, f'{data["id"]}.json'), 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except Exception as e:
log.error(table_name,data,e)
def gitSync(self):
# 删除self.platform_path目录下的所有文件 除了 .git 目录和.git目录下的所有文件
for root, dirs, files in os.walk(platform_path1):
if ".git" not in root:
for name in files:
os.remove(os.path.join(root, name))
self.writeTableJson("dev_model")
self.writeTableJson("instruction")
# 删除self.platform_path目录下的所有文件 除了 .git 目录和.git目录下的所有文件
for root, dirs, files in os.walk(platform_path2):
if ".git" not in root:
for name in files:
os.remove(os.path.join(root, name))
self.writeTableJson("device")
self.writeTableJson("task_group")
self.writeTableJson("device_model_group")
self.writeTableJson("device_group")
self.writeTableJson("task")
self.writeTableJson("task_instruction")
def writeTableJson(self, table_name):
infos = None
if table_name == "dev_model":
ok,infos = self.queryByAll(DevModel)
elif table_name == "instruction":
ok,infos = self.queryByAll(Instruction)
elif table_name == "device":
ok,infos = self.queryByAll(Device)
elif table_name == "task_group":
ok,infos = self.queryByAll(TaskGroup)
elif table_name == "device_model_group":
ok,infos = self.queryByAll(DmGroup)
elif table_name == "device_group":
ok,infos = self.queryByAll(DevGroup)
elif table_name == "task":
ok,infos = self.queryByAll(Task)
elif table_name == "task_instruction":
ok,infos = self.queryByAll(TaskInstruction)
if infos:
for info in infos:
self.writeJsonInThread(table_name, info)
# 确保所有数据都是可序列化的
def ensure_serializable(self, obj):
try:
if 'script' in obj:
# 删掉b''
if isinstance(obj['script'], bytes):
obj['script'] = obj['script'].decode('utf-8')
#如果存在_sa_instance_state 则删除
if "attr" in obj:
obj["attr"] = obj["attr"] and json.loads(obj["attr"]) or {}
if "interface" in obj:
obj["interface"] = obj["interface"] and json.loads(obj["interface"]) or []
# if '_sa_instance_state' in obj:
# del obj['_sa_instance_state']
if isinstance(obj, dict):
return {k: self.ensure_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self.ensure_serializable(v) for v in obj]
elif isinstance(obj, bytes):
return obj.decode('utf-8') # 将bytes转换为字符串
else:
return obj
except Exception as e:
return obj
def removeJson(self, table_name, id):
try:
if table_name == 'dev_model' or table_name == 'instruction' or table_name == 'device_model_group':
#删除json文件
os.remove(os.path.join(platform_path1, table_name, f'{id}.json'))
else:
os.remove(os.path.join(platform_path2, table_name, f'{id}.json'))
except Exception as e:
log.error(e)
def addByClass(self, cls, _data):
try:
if not _data:
return False, None
data = cls()
for k, v in _data.items():
if k == '_sa_instance_state':
continue
setattr(data, k, v)
SessionMaker.add(data)
SessionMaker.commit()
data.id
self.writeJsonInThread(cls.__tablename__, _data)
return True, data.__dict__
except Exception as e:
log.error(e)
return False, None
finally:
SessionMaker.close()
def addByClassFromGit(self, cls, _data):
try:
if not _data:
return False, None
data = cls()
for k, v in _data.items():
if k == '_sa_instance_state':
continue
setattr(data, k, v)
SessionMaker.add(data)
SessionMaker.commit()
data.id
return True, data.__dict__
except Exception as e:
log.error(e)
return False, None
finally:
SessionMaker.close()
def queryTaskByTargetType(self, cls, _target_type):
try:
if not cls or not _target_type:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.target_type == _target_type).all()
if not dataAll:
return False, None
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def queryByTaskId(self, cls, _task_id):
try:
if not cls or not _task_id:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.task_id == _task_id).all()
if not dataAll:
return False, None
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def queryByGroupId(self, cls, _group_id):
try:
if not cls or not _group_id:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.group_id == _group_id).all()
if not dataAll:
return False, None
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def queryByTaskGroupId(self, cls, _group_id):
try:
if not cls or not _group_id:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.group_id == _group_id).all()
if not dataAll:
return False, None
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def queryByDevModelId(self, cls, _dev_model_id):
try:
if not cls or not _dev_model_id:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.dev_model_id == _dev_model_id).all()
if not dataAll:
return False, None
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def queryByIds(self, cls, ids):
try:
if not cls or not ids:
return False, None
data = SessionMaker.query(cls).filter(cls.id.in_(ids)).all()
if not data:
return False, None
result = []
for item in data:
# if '_sa_instance_state' in item.__dict__:
# del item.__dict__['_sa_instance_state']
result.append(item.__dict__)
return True, result
except Exception as e:
log.error(e)
return False, None
def queryById(self, cls, _id):
try:
if not cls or not _id:
return False, None
data = SessionMaker.query(cls).get(_id)
if not data:
return False, None
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
return True, data.__dict__
except Exception as e:
log.error(e)
return False, None
def queryByName(self, cls, _name):
try:
if not cls or not _name:
return False, None
data = SessionMaker.query(cls).filter_by(name=_name).first()
if not data:
return False, None
data.id
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
return True, data.__dict__
except Exception as e:
log.error(e)
return False, None
def queryByTaskId(self, cls, _task_id):
try:
if not cls or not _task_id:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.task_id == _task_id).order_by(cls.level)
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def pagedQueryByTaskId(self, cls, _task_id, start_row=0, limit=50):
try:
if not cls or not _task_id:
return False, None
dataAll = SessionMaker.query(cls).filter(cls.task_id == _task_id).order_by(cls.level).limit(limit).offset(start_row)
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def queryByAll(self, cls):
try:
if not cls:
return False, None
dataAll = SessionMaker.query(cls).all()
if dataAll is None:
return False, None
dataList = list()
for data in dataAll:
# if '_sa_instance_state' in data.__dict__:
# del data.__dict__['_sa_instance_state']
dataList.append(data.__dict__)
return True, dataList
except Exception as e:
log.error(e)
return False, None
def updateById(self, cls, _id, _data):
try:
if not cls or not _id or not _data:
return False, None
data = SessionMaker.query(cls).filter_by(id=_id).first()
for k, v in _data.items():
if k == '_sa_instance_state':
continue
if k not in data.__dict__:
continue
if data.__dict__[k] == v:
continue
setattr(data, k, v)
SessionMaker.commit()
if "id" in _data:
_data = SessionMaker.query(cls).get(_data["id"])
else:
_data = SessionMaker.query(cls).get(_id)
self.writeJsonInThread(cls.__tablename__, _data.__dict__)
return True, data.__dict__
except Exception as e:
log.error(e)
return False, None
finally:
SessionMaker.close()
def deleteById(self, cls, _id):
try:
if not _id:
return False, None
data = SessionMaker.query(cls).filter_by(id=_id).first()
if not data:
return False, None
SessionMaker.delete(data)
SessionMaker.commit()
self.removeJson(cls.__tablename__, _id)
self.gitSyncInThread()
return True, _id
except Exception as e:
log.error(e)
return False, None
finally:
SessionMaker.close()
def deleteByIds(self, cls, ids):
try:
if not ids:
return False, None
data = SessionMaker.query(cls).filter(cls.id.in_(ids)).all()
if not data:
return False, None
for item in data:
SessionMaker.delete(item)
self.removeJson(cls.__tablename__, item.id)
self.gitSyncInThread()
SessionMaker.commit()
return True, ids
except Exception as e:
log.error(e)
return False, None
finally:
SessionMaker.close()
def deleteByName(self, cls, _name):
try:
if not _name:
return False, None
data = SessionMaker.query(cls).filter_by(name=_name).first()
if not data:
return False, None
SessionMaker.delete(data)
SessionMaker.commit()
self.removeJson(cls.__tablename__, data.id)
self.gitSyncInThread()
return True, _name
except Exception as e:
log.error(e)
return False, None
finally:
SessionMaker.close()
def deleteAllTable(self):
try:
self.delete(DmGroup)
self.delete(TaskGroup)
self.delete(Task)
self.delete(TaskInstruction)
self.delete(Device)
self.delete(DevModel)
self.delete(Instruction)
except Exception as e:
log.error(e)
return False
def delete(self, cls):
try:
SessionMaker.query(cls).delete()
SessionMaker.commit()
return True
except Exception as e:
log.error(e)
return False
finally:
SessionMaker.close()
Session = Session()