Source code for influxdb_client.client.query_api_async

"""
Querying InfluxDB by FluxLang.

Flux is InfluxData’s functional data scripting language designed for querying, analyzing, and acting on data.
"""
from typing import List, AsyncGenerator

from influxdb_client.client._base import _BaseQueryApi
from influxdb_client.client.flux_table import FluxRecord, TableList
from influxdb_client.client.query_api import QueryOptions
from influxdb_client.rest import _UTF_8_encoding, ApiException
from .._async.rest import RESTResponseAsync


[docs]class QueryApiAsync(_BaseQueryApi): """Asynchronous implementation for '/api/v2/query' endpoint.""" def __init__(self, influxdb_client, query_options=QueryOptions()): """ Initialize query client. :param influxdb_client: influxdb client """ super().__init__(influxdb_client=influxdb_client, query_options=query_options)
[docs] async def query(self, query: str, org=None, params: dict = None) -> TableList: """ Execute asynchronous Flux query and return result as a :class:`~influxdb_client.client.flux_table.FluxTable` list. :param query: the Flux query :param str, Organization org: specifies the organization for executing the query; Take the ``ID``, ``Name`` or ``Organization``. If not specified the default value from ``InfluxDBClientAsync.org`` is used. :param params: bind parameters :return: :class:`~influxdb_client.client.flux_table.FluxTable` list wrapped into :class:`~influxdb_client.client.flux_table.TableList` :rtype: TableList Serialization the query results to flattened list of values via :func:`~influxdb_client.client.flux_table.TableList.to_values`: .. code-block:: python from influxdb_client import InfluxDBClient async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output) .. code-block:: python [ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ] Serialization the query results to JSON via :func:`~influxdb_client.client.flux_table.TableList.to_json`: .. code-block:: python from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output) .. code-block:: javascript [ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ] """ # noqa: E501 org = self._org_param(org) response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params)) return await self._to_tables_async(response, query_options=self._get_query_options())
[docs] async def query_stream(self, query: str, org=None, params: dict = None) -> AsyncGenerator['FluxRecord', None]: """ Execute asynchronous Flux query and return stream of :class:`~influxdb_client.client.flux_table.FluxRecord` as an AsyncGenerator[:class:`~influxdb_client.client.flux_table.FluxRecord`]. :param query: the Flux query :param str, Organization org: specifies the organization for executing the query; Take the ``ID``, ``Name`` or ``Organization``. If not specified the default value from ``InfluxDBClientAsync.org`` is used. :param params: bind parameters :return: AsyncGenerator[:class:`~influxdb_client.client.flux_table.FluxRecord`] """ # noqa: E501 org = self._org_param(org) response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params)) return await self._to_flux_record_stream_async(response, query_options=self._get_query_options())
[docs] async def query_data_frame(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None, use_extension_dtypes: bool = False): """ Execute asynchronous Flux query and return :class:`~pandas.core.frame.DataFrame`. .. note:: If the ``query`` returns tables with differing schemas than the client generates a :class:`~DataFrame` for each of them. :param query: the Flux query :param str, Organization org: specifies the organization for executing the query; Take the ``ID``, ``Name`` or ``Organization``. If not specified the default value from ``InfluxDBClientAsync.org`` is used. :param data_frame_index: the list of columns that are used as DataFrame index :param params: bind parameters :param use_extension_dtypes: set to ``True`` to use panda's extension data types. Useful for queries with ``pivot`` function. When data has missing values, column data type may change (to ``object`` or ``float64``). Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value. For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html. :return: :class:`~DataFrame` or :class:`~List[DataFrame]` .. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table. .. code-block:: text from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") For more info see: - https://docs.influxdata.com/resources/videos/pivots-in-flux/ - https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/ - https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/ """ # noqa: E501 _generator = await self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index, params=params, use_extension_dtypes=use_extension_dtypes) dataframes = [] async for dataframe in _generator: dataframes.append(dataframe) return self._to_data_frames(dataframes)
[docs] async def query_data_frame_stream(self, query: str, org=None, data_frame_index: List[str] = None, params: dict = None, use_extension_dtypes: bool = False): """ Execute asynchronous Flux query and return stream of :class:`~pandas.core.frame.DataFrame` as an AsyncGenerator[:class:`~pandas.core.frame.DataFrame`]. .. note:: If the ``query`` returns tables with differing schemas than the client generates a :class:`~DataFrame` for each of them. :param query: the Flux query :param str, Organization org: specifies the organization for executing the query; Take the ``ID``, ``Name`` or ``Organization``. If not specified the default value from ``InfluxDBClientAsync.org`` is used. :param data_frame_index: the list of columns that are used as DataFrame index :param params: bind parameters :param use_extension_dtypes: set to ``True`` to use panda's extension data types. Useful for queries with ``pivot`` function. When data has missing values, column data type may change (to ``object`` or ``float64``). Nullable extension types (``Int64``, ``Float64``, ``boolean``) support ``panda.NA`` value. For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html. :return: :class:`AsyncGenerator[:class:`DataFrame`]` .. warning:: For the optimal processing of the query results use the ``pivot() function`` which align results as a table. .. code-block:: text from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") For more info see: - https://docs.influxdata.com/resources/videos/pivots-in-flux/ - https://docs.influxdata.com/flux/latest/stdlib/universe/pivot/ - https://docs.influxdata.com/flux/latest/stdlib/influxdata/influxdb/schema/fieldsascols/ """ # noqa: E501 org = self._org_param(org) response = await self._post_query(org=org, query=self._create_query(query, self.default_dialect, params, dataframe_query=True)) return await self._to_data_frame_stream_async(data_frame_index=data_frame_index, response=response, query_options=self._get_query_options(), use_extension_dtypes=use_extension_dtypes)
[docs] async def query_raw(self, query: str, org=None, dialect=_BaseQueryApi.default_dialect, params: dict = None): """ Execute asynchronous Flux query and return result as raw unprocessed result as a str. :param query: a Flux query :param str, Organization org: specifies the organization for executing the query; Take the ``ID``, ``Name`` or ``Organization``. If not specified the default value from ``InfluxDBClientAsync.org`` is used. :param dialect: csv dialect format :param params: bind parameters :return: :class:`~str` """ org = self._org_param(org) result = await self._post_query(org=org, query=self._create_query(query, dialect, params)) raw_bytes = await result.read() return raw_bytes.decode(_UTF_8_encoding)
async def _post_query(self, org, query): response = await self._query_api.post_query_async(org=org, query=query, async_req=False, _preload_content=False, _return_http_data_only=True) if not 200 <= response.status <= 299: data = await response.read() raise ApiException(http_resp=RESTResponseAsync(response, data)) return response