Async API Reference

InfluxDBClientAsync

class influxdb_client.client.influxdb_client_async.InfluxDBClientAsync(url, token: Optional[str] = None, org: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, **kwargs)[source]

InfluxDBClientAsync is client for InfluxDB v2.

Initialize defaults.

Parameters:
  • url – InfluxDB server API url (ex. http://localhost:8086).

  • tokentoken to authenticate to the InfluxDB 2.x

  • org – organization name (used as a default in Query, Write and Delete API)

  • debug – enable verbose logging of http requests

  • timeout – The maximal number of milliseconds for the whole HTTP request including connection establishment, request sending and response reading. It can also be a ClientTimeout which is directly pass to aiohttp.

  • enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.

Key bool verify_ssl:

Set this to false to skip verifying SSL certificate when calling API from https server.

Key str ssl_ca_cert:

Set this to customize the certificate file to verify the peer.

Key str cert_file:

Path to the certificate that will be used for mTLS authentication.

Key str cert_key_file:

Path to the file contains private key for mTLS certificate.

Key str cert_key_password:

String or function which returns password for decrypting the mTLS private key.

Key ssl.SSLContext ssl_context:

Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.

Key str proxy:

Set this to configure the http proxy to be used (ex. http://localhost:3128)

Key str proxy_headers:

A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

Key int connection_pool_maxsize:

The total number of simultaneous connections. Defaults to “multiprocessing.cpu_count() * 5”.

Key bool auth_basic:

Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)

Key str username:

username to authenticate via username and password credentials to the InfluxDB 2.x

Key str password:

password to authenticate via username and password credentials to the InfluxDB 2.x

Key bool allow_redirects:

If set to False, do not follow HTTP redirects. True by default.

Key int max_redirects:

Maximum number of HTTP redirects to follow. 10 by default.

Key dict client_session_kwargs:

Additional configuration arguments for ClientSession

Key type client_session_type:

Type of aiohttp client to use. Useful for third party wrappers like aiohttp-retry. ClientSession by default.

Key list[str] profilers:

list of enabled Flux profilers

async build() str[source]

Return the build type of the connected InfluxDB Server.

Returns:

The type of InfluxDB build.

async close()[source]

Shutdown the client.

delete_api() DeleteApiAsync[source]

Get the asynchronous delete metrics API instance.

Returns:

delete api

classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]

Configure client via configuration file. The configuration has to be under ‘influx’ section.

Parameters:
  • config_file – Path to configuration file

  • debug – Enable verbose logging of http requests

  • enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.

Key config_name:

Name of the configuration section of the configuration file

Key str proxy_headers:

A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

Key urllib3.util.retry.Retry retries:

Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.

Key ssl.SSLContext ssl_context:

Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.

The supported formats:
Configuration options:
  • url

  • org

  • token

  • timeout,

  • verify_ssl

  • ssl_ca_cert

  • cert_file

  • cert_key_file

  • cert_key_password

  • connection_pool_maxsize

  • auth_basic

  • profilers

  • proxy

config.ini example:

[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
connection_pool_maxsize=25
auth_basic=false
profilers=query,operator
proxy=http:proxy.domain.org:8080

[tags]
id = 132-987-655
customer = California Miner
data_center = ${env.data_center}

config.toml example:

[influx2]
    url = "http://localhost:8086"
    token = "my-token"
    org = "my-org"
    timeout = 6000
    connection_pool_maxsize = 25
    auth_basic = false
    profilers="query, operator"
    proxy = "http://proxy.domain.org:8080"

[tags]
    id = "132-987-655"
    customer = "California Miner"
    data_center = "${env.data_center}"

config.json example:

{
    "url": "http://localhost:8086",
    "token": "my-token",
    "org": "my-org",
    "active": true,
    "timeout": 6000,
    "connection_pool_maxsize": 55,
    "auth_basic": false,
    "profilers": "query, operator",
    "tags": {
        "id": "132-987-655",
        "customer": "California Miner",
        "data_center": "${env.data_center}"
    }
}
classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]

Configure client via environment properties.

Parameters:
  • debug – Enable verbose logging of http requests

  • enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.

Key str proxy:

Set this to configure the http proxy to be used (ex. http://localhost:3128)

Key str proxy_headers:

A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

Key urllib3.util.retry.Retry retries:

Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.

Key ssl.SSLContext ssl_context:

Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.

Supported environment properties:
  • INFLUXDB_V2_URL

  • INFLUXDB_V2_ORG

  • INFLUXDB_V2_TOKEN

  • INFLUXDB_V2_TIMEOUT

  • INFLUXDB_V2_VERIFY_SSL

  • INFLUXDB_V2_SSL_CA_CERT

  • INFLUXDB_V2_CERT_FILE

  • INFLUXDB_V2_CERT_KEY_FILE

  • INFLUXDB_V2_CERT_KEY_PASSWORD

  • INFLUXDB_V2_CONNECTION_POOL_MAXSIZE

  • INFLUXDB_V2_AUTH_BASIC

  • INFLUXDB_V2_PROFILERS

  • INFLUXDB_V2_TAG

async ping() bool[source]

Return the status of InfluxDB instance.

Returns:

The status of InfluxDB.

query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApiAsync[source]

Create an asynchronous Query API instance.

Parameters:

query_options – optional query api configuration

Returns:

Query api instance

async version() str[source]

Return the version of the connected InfluxDB Server.

Returns:

The version of InfluxDB.

write_api(point_settings=<influxdb_client.client.write_api.PointSettings object>) WriteApiAsync[source]

Create an asynchronous Write API instance.

Example:
from influxdb_client_async import InfluxDBClientAsync


# Initialize async/await instance of Write API
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
    write_api = client.write_api()
Parameters:

point_settings – settings to store default tags

Returns:

write api instance

QueryApiAsync

class influxdb_client.client.query_api_async.QueryApiAsync(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]

Asynchronous implementation for ‘/api/v2/query’ endpoint.

Initialize query client.

Parameters:

influxdb_client – influxdb client

async query(query: str, org=None, params: Optional[dict] = None) TableList[source]

Execute asynchronous Flux query and return result as a FluxTable list.

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • params – bind parameters

Returns:

FluxTable list wrapped into TableList

Return type:

TableList

Serialization the query results to flattened list of values via to_values():

from influxdb_client import InfluxDBClient

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:

    # Query: using Table structure
    tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)')

    # Serialize to values
    output = tables.to_values(columns=['location', '_time', '_value'])
    print(output)
[
    ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3],
    ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3],
    ...
]

Serialization the query results to JSON via to_json():

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
    # Query: using Table structure
    tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)')

    # Serialize to JSON
    output = tables.to_json(indent=5)
    print(output)
[
    {
        "_measurement": "mem",
        "_start": "2021-06-23T06:50:11.897825+00:00",
        "_stop": "2021-06-25T06:50:11.897825+00:00",
        "_time": "2020-02-27T16:20:00.897825+00:00",
        "region": "north",
         "_field": "usage",
        "_value": 15
    },
    {
        "_measurement": "mem",
        "_start": "2021-06-23T06:50:11.897825+00:00",
        "_stop": "2021-06-25T06:50:11.897825+00:00",
        "_time": "2020-02-27T16:20:01.897825+00:00",
        "region": "west",
         "_field": "usage",
        "_value": 10
    },
    ...
]
async query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None, use_extension_dtypes: bool = False)[source]

Execute asynchronous Flux query and return DataFrame.

Note

If the query returns tables with differing schemas than the client generates a DataFrame for each of them.

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • data_frame_index – the list of columns that are used as DataFrame index

  • params – bind parameters

  • use_extension_dtypes – set to True to use panda’s extension data types. Useful for queries with pivot function. When data has missing values, column data type may change (to object or float64). Nullable extension types (Int64, Float64, boolean) support panda.NA value. For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.

Returns:

DataFrame or List[DataFrame]

Warning

For the optimal processing of the query results use the pivot() function which align results as a table.

from(bucket:"my-bucket")
    |> range(start: -5m, stop: now())
    |> filter(fn: (r) => r._measurement == "mem")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
For more info see:
async query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None, use_extension_dtypes: bool = False)[source]

Execute asynchronous Flux query and return stream of DataFrame as an AsyncGenerator[DataFrame].

Note

If the query returns tables with differing schemas than the client generates a DataFrame for each of them.

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • data_frame_index – the list of columns that are used as DataFrame index

  • params – bind parameters

  • use_extension_dtypes – set to True to use panda’s extension data types. Useful for queries with pivot function. When data has missing values, column data type may change (to object or float64). Nullable extension types (Int64, Float64, boolean) support panda.NA value. For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.

Returns:

AsyncGenerator[:class:`DataFrame]`

Warning

For the optimal processing of the query results use the pivot() function which align results as a table.

from(bucket:"my-bucket")
    |> range(start: -5m, stop: now())
    |> filter(fn: (r) => r._measurement == "mem")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
For more info see:
async query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]

Execute asynchronous Flux query and return result as raw unprocessed result as a str.

Parameters:
  • query – a Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • dialect – csv dialect format

  • params – bind parameters

Returns:

str

async query_stream(query: str, org=None, params: Optional[dict] = None) AsyncGenerator[FluxRecord, None][source]

Execute asynchronous Flux query and return stream of FluxRecord as an AsyncGenerator[FluxRecord].

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • params – bind parameters

Returns:

AsyncGenerator[FluxRecord]

WriteApiAsync

class influxdb_client.client.write_api_async.WriteApiAsync(influxdb_client, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>)[source]

Implementation for ‘/api/v2/write’ endpoint.

Example:
from influxdb_client_async import InfluxDBClientAsync


# Initialize async/await instance of Write API
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
    write_api = client.write_api()

Initialize defaults.

Parameters:
  • influxdb_client – with default settings (organization)

  • point_settings – settings to store default tags.

async write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) bool[source]

Write time-series data into InfluxDB.

Parameters:
  • bucket (str) – specifies the destination bucket for writes (required)

  • org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.

  • record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame

Key data_frame_measurement_name:

name of measurement for writing Pandas DataFrame - DataFrame

Key data_frame_tag_columns:

list of DataFrame columns which are tags, rest columns will be fields - DataFrame

Key data_frame_timestamp_column:

name of DataFrame column which contains a timestamp. The column can be defined as a str value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime - DataFrame

Key data_frame_timestamp_timezone:

name of the timezone which is used for timestamp column - DataFrame

Key record_measurement_key:

key of record with specified measurement - dictionary, NamedTuple, dataclass

Key record_measurement_name:

static measurement name - dictionary, NamedTuple, dataclass

Key record_time_key:

key of record with specified timestamp - dictionary, NamedTuple, dataclass

Key record_tag_keys:

list of record keys to use as a tag - dictionary, NamedTuple, dataclass

Key record_field_keys:

list of record keys to use as a field - dictionary, NamedTuple, dataclass

Returns:

True for successfully accepted data, otherwise raise an exception

Example:
# Record as Line Protocol
await write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1")

# Record as Dictionary
dictionary = {
    "measurement": "h2o_feet",
    "tags": {"location": "us-west"},
    "fields": {"level": 125},
    "time": 1
}
await write_api.write("my-bucket", "my-org", dictionary)

# Record as Point
from influxdb_client import Point
point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1)
await write_api.write("my-bucket", "my-org", point)
DataFrame:

If the data_frame_timestamp_column is not specified the index of Pandas DataFrame is used as a timestamp for written data. The index can be PeriodIndex or its must be transformable to datetime by pandas.to_datetime.

If you would like to transform a column to PeriodIndex, you can use something like:

import pandas as pd

# DataFrame
data_frame = ...
# Set column as Index
data_frame.set_index('column_name', inplace=True)
# Transform index to PeriodIndex
data_frame.index = pd.to_datetime(data_frame.index, unit='s')

DeleteApiAsync

class influxdb_client.client.delete_api_async.DeleteApiAsync(influxdb_client)[source]

Async implementation for ‘/api/v2/delete’ endpoint.

Initialize defaults.

async delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) bool[source]

Delete Time series data from InfluxDB.

Parameters:
  • start (str, datetime.datetime) – start time

  • stop (str, datetime.datetime) – stop time

  • predicate (str) – predicate

  • bucket (str) – bucket id or name from which data will be deleted

  • org (str, Organization) – specifies the organization to delete data from. Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

Returns:

True for successfully deleted data, otherwise raise an exception