#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import sys import json import time from PyQt6.QtNetwork import * from PyQt6 import * from PyQt6.QtCore import * from interfaceSession.sessionAbstract import SessionAbstract from logs import log import typing class SessionHttp(SessionAbstract): newDataArrive = pyqtSignal(bytes) sendData = pyqtSignal(bytes) def __init__(self, id, name, attrs): super().__init__(id, name, "http", attrs) self.id = id self.name = name self.attrs = attrs self.reply = None # manager self.manager = None self.sendData.connect(self.writeData) def setAttrs(self, attrs): self._PROTECTED__attrs.update(attrs) self.attrs = attrs def writeData(self, received_data: bytes): try: if self.manager is None: self.manager = QNetworkAccessManager() # 解析收到的数据 decoded_str = received_data.decode('utf-8') data = json.loads(decoded_str) url = QUrl(data['url']) params_str = data['rsp_params'] params = json.loads(params_str) headers_str = data['rsp_headers'] headers = json.loads(headers_str) bodys_str = data['rsp_bodys'] request_type = data['type'] if request_type == 'GET': url_with_params = self.appendParamsToUrl(url, params) request = QNetworkRequest(url_with_params) for key, value in headers.items(): request.setRawHeader(QByteArray(key.encode()), QByteArray(value.encode())) self.reply = self.manager.get(request) self.reply.finished.connect(self.on_ready_read) elif request_type == 'POST': # Assuming you have the necessary logic to construct the post data url_with_params = self.appendParamsToUrl(url, params) request = QNetworkRequest(url_with_params) for key, value in headers.items(): request.setRawHeader(QByteArray(key.encode()), QByteArray(value.encode())) post_data = self.constructPostData(bodys_str) self.reply = self.manager.post(request, post_data) self.reply.finished.connect(self.on_ready_read) else: print("Unsupported request type:", request_type) except Exception as e: log.error(e) @pyqtSlot(bytes) def send(self, data): self.sendData.emit(bytes(data)) print(f"{self.name} send data", data) def appendParamsToUrl(self, url, params): query = QUrlQuery() for key, value in params.items(): print(key, value) query.addQueryItem(key, value) url.setQuery(query) print(url) return url def constructPostData(self, params): # Assuming you have the necessary logic to construct the post data # post_data = json.dumps(params) return QByteArray(params.encode('utf-8')) def on_ready_read(self, reply: typing.Optional[QNetworkReply] = None): if self.reply is not None: data = self.reply.readAll() self.newDataArrive.emit(bytes(data)) else: print("Received a null reply")