Source code for influxdb_client.client.query_api

import codecs
import csv
from typing import List, Generator, Any

from influxdb_client import Dialect
from influxdb_client import Query, QueryService
from influxdb_client.client.flux_csv_parser import FluxCsvParser
from influxdb_client.client.flux_table import FluxTable, FluxRecord


[docs]class QueryApi(object): default_dialect = Dialect(header=True, delimiter=",", comment_prefix="#", annotations=["datatype", "group", "default"], date_time_format="RFC3339") def __init__(self, influxdb_client): """ Initializes query client. :param influxdb_client: influxdb client """ self._influxdb_client = influxdb_client self._query_api = QueryService(influxdb_client.api_client)
[docs] def query_csv(self, query: str, org=None, dialect: Dialect = default_dialect): """ Executes the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file. :param query: a Flux query :param org: organization name (optional if already specified in InfluxDBClient) :param dialect: csv dialect format :return: The returned object is an iterator. Each iteration returns a row of the CSV file (which can span multiple input lines). """ if org is None: org = self._influxdb_client.org response = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False, _preload_content=False) return csv.reader(codecs.iterdecode(response, 'utf-8'))
[docs] def query_raw(self, query: str, org=None, dialect=default_dialect): """ Synchronously executes the Flux query and return result as raw unprocessed result as a str :param query: a Flux query :param org: organization name (optional if already specified in InfluxDBClient) :param dialect: csv dialect format :return: str """ if org is None: org = self._influxdb_client.org result = self._query_api.post_query(org=org, query=self._create_query(query, dialect), async_req=False, _preload_content=False) return result
[docs] def query(self, query: str, org=None) -> List['FluxTable']: """ Synchronously executes the Flux query and return result as a List['FluxTable'] :param query: the Flux query :param org: organization name (optional if already specified in InfluxDBClient) :return: """ if org is None: org = self._influxdb_client.org response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, stream=False) list(_parser.generator()) return _parser.tables
[docs] def query_stream(self, query: str, org=None) -> Generator['FluxRecord', Any, None]: """ Synchronously executes the Flux query and return stream of FluxRecord as a Generator['FluxRecord'] :param query: the Flux query :param org: organization name (optional if already specified in InfluxDBClient) :return: """ if org is None: org = self._influxdb_client.org response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect), async_req=False, _preload_content=False, _return_http_data_only=False) _parser = FluxCsvParser(response=response, stream=True) return _parser.generator()
# private helper for c @staticmethod def _create_query(query, dialect=default_dialect): created = Query(query=query, dialect=dialect) return created def __del__(self): pass