Querying the InfluxDB v1 API¶
In the EFD Client section, we discussed how to access EFD data using the EFD Python client from a notebook in the RSP.
For services or applications that need to access EFD data, the recommended method is to query the InfluxDB v1 API directly.
For example, to query the InfluxDB v1 API at USDF, you need the following information:
The
INFLUXDB_URL
is the URL of the InfluxDB server. For the USDF environment, the URL ishttps://usdf-rsp.slac.stanford.edu/influxdb-enterprise-data
.The
INFLUXDB_DATABASE
is the name of the database you want to query. For the EFD database, the name isefd
.The
INFLUXDB_USER
andINFLUXDB_PASSWORD
are the credentials for accessing the database and can be obtained from the LSSTIT SQuare Vault in 1Password under “EFD reader at USDF”.
INFLUXDB_URL = https://usdf-rsp.slac.stanford.edu/influxdb-enterprise-data
INFLUXDB_DATABASE = "efd"
INFLUXDB_USER = ""
INFLUXDB_PASSWORD = ""
You can use the InfluxDB API /query
endpoint to query data using InfluxQL.
Here is an example on how to query the InfluxDB v1 API using the Python requests module.
import os
import requests
class InfluxDBClient:
"""A simple InfluxDB client.
Parameters
----------
url : str
The URL of the InfluxDB API.
database_name : str
The name of the database to query.
username : str, optional
The username to authenticate with.
password : str, optional
The password to authenticate with.
"""
def __init__(
self,
url: str,
database_name: str,
username: str | None = None,
password: str | None = None,
) -> None:
self.url = url
self.database_name = database_name
self.auth = (username, password) if username and password else None
def query(self, query: str) -> dict:
"""Send a query to the InfluxDB API."""
params = {"db": self.database_name, "q": query}
try:
response = requests.get(f"{self.url}/query", params=params, auth=self.auth)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as exc:
raise InfluxDBError(f"An error occurred: {exc}") from exc
client = InfluxDBClient(
url=INFLUXDB_URL,
database_name=INFLUXDB_DATABASE,
username=INFLUXDB_USER,
password=INFLUXDB_PASSWORD,
)
response = client.query(
"""SELECT vacuum FROM "lsst.sal.ATCamera.vacuum" WHERE time > now() - 1d"""
)
The InfluxDB v1 API response format¶
The InfluxDB v1 API response is a JSON object with the following structure:
{
"results": [
{
"statement_id": 0,
"series": [
{
"columns": ["time", "vacuum"],
"name": "lsst.sal.ATCamera.vacuum",
"values": [
["2024-05-30T16:49:40.119558Z", 3.08e-7],
["2024-05-30T16:49:50.120548Z", 3.17e-7],
["2024-05-30T16:50:00.12093Z", 3.52e-7]
]
}
]
}
]
}
The /query
endpoint supports sending multiple queries in a single request by separating them with a semicolon.
If you send a single query like above the result will have a single statement_id
.
You can query multiple topics in a single request as well.
If you query a single topic like above the result will have a single series
.
Converting the InfluxDB v1 API response to a Pandas DataFrame¶
To convert the InfluxDB v1 API response to a Pandas DataFrame, you can use the following code, assuming you are sending a single query and querying a single topic at a time like in the example above. The result is equivalent to the Pandas DataFrame you would get from the EFD Python client.
import pandas as pd
def _to_dataframe(self, response: dict) -> pd.DataFrame:
"""Convert an InfluxDB response to a Pandas dataframe.
Parameters
----------
response : dict
The JSON response from the InfluxDB API.
"""
# One query submitted at a time
statement = response["results"][0]
# One topic queried at a time
series = statement["series"][0]
result = pd.DataFrame(series.get("values", []), columns=series["columns"])
if "time" not in result.columns:
return result
result = result.set_index(pd.to_datetime(result["time"])).drop("time", axis=1)
if result.index.tzinfo is None:
result.index = result.index.tz_localize("UTC")
if "tags" in series:
for k, v in series["tags"].items():
result[k] = v
if "name" in series:
result.name = series["name"]
return result