Async API Reference¶
InfluxDBClientAsync¶
-
class
influxdb_client.client.influxdb_client_async.
InfluxDBClientAsync
(url, token: str = None, org: str = None, debug=None, timeout=10000, enable_gzip=False, **kwargs)[source]¶ InfluxDBClientAsync is client for InfluxDB v2.
Initialize defaults.
Parameters: - url – InfluxDB server API url (ex. http://localhost:8086).
- token –
token
to authenticate to the InfluxDB 2.x - org – organization name (used as a default in Query, Write and Delete API)
- debug – enable verbose logging of http requests
- timeout – The maximal number of milliseconds for the whole HTTP request including
connection establishment, request sending and response reading.
It can also be a
ClientTimeout
which is directly pass toaiohttp
. - enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
Key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
Key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
Key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128)
Key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
Key int connection_pool_maxsize: The total number of simultaneous connections. Defaults to “multiprocessing.cpu_count() * 5”.
Key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
Key str username: username
to authenticate via username and password credentials to the InfluxDB 2.xKey str password: password
to authenticate via username and password credentials to the InfluxDB 2.xKey bool allow_redirects: If set to
False
, do not follow HTTP redirects.True
by default.Key int max_redirects: Maximum number of HTTP redirects to follow.
10
by default.Key dict client_session_kwargs: Additional configuration arguments for
ClientSession
Key type client_session_type: Type of aiohttp client to use. Useful for third party wrappers like
aiohttp-retry
.ClientSession
by default.Key list[str] profilers: list of enabled Flux profilers
-
delete_api
() → influxdb_client.client.delete_api_async.DeleteApiAsync[source]¶ Get the asynchronous delete metrics API instance.
Returns: delete api
-
classmethod
from_config_file
(config_file: str = 'config.ini', debug=None, enable_gzip=False)[source]¶ Configure client via configuration file. The configuration has to be under ‘influx’ section.
- The supported formats:
- Configuration options:
- url
- org
- token
- timeout,
- verify_ssl
- ssl_ca_cert
- connection_pool_maxsize
- auth_basic
- profilers
- proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
-
classmethod
from_env_properties
(debug=None, enable_gzip=False)[source]¶ Configure client via environment properties.
- Supported environment properties:
- INFLUXDB_V2_URL
- INFLUXDB_V2_ORG
- INFLUXDB_V2_TOKEN
- INFLUXDB_V2_TIMEOUT
- INFLUXDB_V2_VERIFY_SSL
- INFLUXDB_V2_SSL_CA_CERT
- INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
- INFLUXDB_V2_AUTH_BASIC
- INFLUXDB_V2_PROFILERS
- INFLUXDB_V2_TAG
-
query_api
(query_options: influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) → influxdb_client.client.query_api_async.QueryApiAsync[source]¶ Create an asynchronous Query API instance.
Parameters: query_options – optional query api configuration Returns: Query api instance
-
version
() → str[source]¶ Return the version of the connected InfluxDB Server.
Returns: The version of InfluxDB.
-
write_api
(point_settings=<influxdb_client.client.write_api.PointSettings object>) → influxdb_client.client.write_api_async.WriteApiAsync[source]¶ Create an asynchronous Write API instance.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
Parameters: point_settings – settings to store default tags Returns: write api instance
QueryApiAsync¶
-
class
influxdb_client.client.query_api_async.
QueryApiAsync
(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]¶ Asynchronous implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
Parameters: influxdb_client – influxdb client -
query
(query: str, org=None, params: dict = None) → influxdb_client.client.flux_table.TableList[source]¶ Execute asynchronous Flux query and return result as a
FluxTable
list.Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used. - params – bind parameters
Returns: Return type: Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
-
query_data_frame
(query: str, org=None, data_frame_index: List[str] = None, params: dict = None)[source]¶ Execute asynchronous Flux query and return
DataFrame
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used. - data_frame_index – the list of columns that are used as DataFrame index
- params – bind parameters
Returns: DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
-
query_data_frame_stream
(query: str, org=None, data_frame_index: List[str] = None, params: dict = None)[source]¶ Execute asynchronous Flux query and return stream of
DataFrame
as an AsyncGenerator[DataFrame
].Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used. - data_frame_index – the list of columns that are used as DataFrame index
- params – bind parameters
Returns: AsyncGenerator[:class:`DataFrame
]`Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
-
query_raw
(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: dict = None)[source]¶ Execute asynchronous Flux query and return result as raw unprocessed result as a str.
Parameters: - query – a Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used. - dialect – csv dialect format
- params – bind parameters
Returns:
-
query_stream
(query: str, org=None, params: dict = None) → AsyncGenerator[influxdb_client.client.flux_table.FluxRecord, None][source]¶ Execute asynchronous Flux query and return stream of
FluxRecord
as an AsyncGenerator[FluxRecord
].Parameters: - query – the Flux query
- Organization org (str,) – specifies the organization for executing the query;
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used. - params – bind parameters
Returns: AsyncGenerator[
FluxRecord
]
-
WriteApiAsync¶
-
class
influxdb_client.client.write_api_async.
WriteApiAsync
(influxdb_client, point_settings: influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>)[source]¶ Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
Initialize defaults.
Parameters: - influxdb_client – with default settings (organization)
- point_settings – settings to store default tags.
-
write
(bucket: str, org: str = None, record: Union[str, Iterable[str], influxdb_client.client.write.point.Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]] = None, write_precision: influxdb_client.domain.write_precision.WritePrecision = 'ns', **kwargs) → bool[source]¶ Write time-series data into InfluxDB.
Parameters: - bucket (str) – specifies the destination bucket for writes (required)
- Organization org (str,) – specifies the destination organization for writes;
take the ID, Name or Organization.
If not specified the default value from
InfluxDBClientAsync.org
is used. - write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
- record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame
Key data_frame_measurement_name: name of measurement for writing Pandas DataFrame -
DataFrame
Key data_frame_tag_columns: list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
Key data_frame_timestamp_column: name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
Key data_frame_timestamp_timezone: name of the timezone which is used for timestamp column -
DataFrame
Key record_measurement_key: key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
Key record_measurement_name: static measurement name -
dictionary
,NamedTuple
,dataclass
Key record_time_key: key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
Key record_tag_keys: list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
Key record_field_keys: list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
Returns: True
for successfully accepted data, otherwise raise an exception- Example:
# Record as Line Protocol await write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } await write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) await write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
DeleteApiAsync¶
-
class
influxdb_client.client.delete_api_async.
DeleteApiAsync
(influxdb_client)[source]¶ Async implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
-
delete
(start: Union[str, datetime.datetime], stop: Union[str, datetime.datetime], predicate: str, bucket: str, org: Union[str, influxdb_client.domain.organization.Organization, None] = None) → bool[source]¶ Delete Time series data from InfluxDB.
Parameters: - datetime.datetime start (str,) – start time
- datetime.datetime stop (str,) – stop time
- predicate (str) – predicate
- bucket (str) – bucket id or name from which data will be deleted
- Organization org (str,) – specifies the organization to delete data from.
Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.
Returns: True
for successfully deleted data, otherwise raise an exception
-