PCM_Report/influx_service.py

68 lines
2.4 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List, Optional
import pandas as pd
from influxdb_client import InfluxDBClient
import warnings
import re
from influxdb_client.client.warnings import MissingPivotFunction
@dataclass
class InfluxConnectionParams:
url: str
org: str
token: str
class InfluxService:
def __init__(self, params: InfluxConnectionParams) -> None:
self._params = params
self._client = InfluxDBClient(url=params.url, org=params.org, token=params.token)
def build_flux(self,
bucket: str,
measurement: str,
fields: List[str],
filters: Optional[Dict[str, str]] = None,
time_range: str = "-1h",
aggregate: str = "",
window_period: str = "") -> str:
filters = filters or {}
# sanitize aggregate: must be a valid Flux function name
if aggregate and not re.match(r'^[A-Za-z_][A-Za-z0-9_]*$', aggregate):
aggregate = ""
field_filter = " or ".join([f'r["_field"] == "{f}"' for f in fields]) if fields else "true"
tag_filters = " and ".join([f'r["{k}"] == "{v}"' for k, v in filters.items()]) if filters else "true"
wp = window_period.strip() or "1m"
agg_line = f"\n |> aggregateWindow(every: {wp}, fn: {aggregate}, createEmpty: false)" if aggregate else ""
flux = f"""
from(bucket: "{bucket}")
|> range(start: {time_range})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => {field_filter})
|> filter(fn: (r) => {tag_filters}){agg_line}
|> yield(name: "result")
""".strip()
return flux
def query(self,
bucket: str,
measurement: str,
fields: List[str],
filters: Optional[Dict[str, str]] = None,
time_range: str = "-1h",
aggregate: str = "",
window_period: str = "") -> pd.DataFrame:
flux = self.build_flux(bucket, measurement, fields, filters, time_range, aggregate, window_period)
with warnings.catch_warnings():
warnings.simplefilter("ignore", MissingPivotFunction)
frames = self._client.query_api().query_data_frame(flux)
if isinstance(frames, list):
df = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
else:
df = frames
return df