Async API Reference

InfluxDBClientAsync

class influxdb_client.client.influxdb_client_async.InfluxDBClientAsync(url, token: Optional[str] = None, org: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, **kwargs)[source]

InfluxDBClientAsync is client for InfluxDB v2.

Initialize defaults.

Parameters:
  • url – InfluxDB server API url (ex. http://localhost:8086).

  • tokentoken to authenticate to the InfluxDB 2.x

  • org – organization name (used as a default in Query, Write and Delete API)

  • debug – enable verbose logging of http requests

  • timeout – The maximal number of milliseconds for the whole HTTP request including connection establishment, request sending and response reading. It can also be a ClientTimeout which is directly pass to aiohttp.

  • enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.

Key bool verify_ssl:

Set this to false to skip verifying SSL certificate when calling API from https server.

Key str ssl_ca_cert:

Set this to customize the certificate file to verify the peer.

Key str cert_file:

Path to the certificate that will be used for mTLS authentication.

Key str cert_key_file:

Path to the file contains private key for mTLS certificate.

Key str cert_key_password:

String or function which returns password for decrypting the mTLS private key.

Key ssl.SSLContext ssl_context:

Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.

Key str proxy:

Set this to configure the http proxy to be used (ex. http://localhost:3128)

Key str proxy_headers:

A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

Key int connection_pool_maxsize:

The total number of simultaneous connections. Defaults to “multiprocessing.cpu_count() * 5”.

Key bool auth_basic:

Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)

Key str username:

username to authenticate via username and password credentials to the InfluxDB 2.x

Key str password:

password to authenticate via username and password credentials to the InfluxDB 2.x

Key bool allow_redirects:

If set to False, do not follow HTTP redirects. True by default.

Key int max_redirects:

Maximum number of HTTP redirects to follow. 10 by default.

Key dict client_session_kwargs:

Additional configuration arguments for ClientSession

Key type client_session_type:

Type of aiohttp client to use. Useful for third party wrappers like aiohttp-retry. ClientSession by default.

Key list[str] profilers:

list of enabled Flux profilers

async build() str[source]

Return the build type of the connected InfluxDB Server.

Returns:

The type of InfluxDB build.

async close()[source]

Shutdown the client.

delete_api() DeleteApiAsync[source]

Get the asynchronous delete metrics API instance.

Returns:

delete api

classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]

Configure client via configuration file. The configuration has to be under ‘influx’ section.

Parameters:
  • config_file – Path to configuration file

  • debug – Enable verbose logging of http requests

  • enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.

Key config_name:

Name of the configuration section of the configuration file

Key str proxy_headers:

A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

Key urllib3.util.retry.Retry retries:

Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.

Key ssl.SSLContext ssl_context:

Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.

The supported formats:
Configuration options:
  • url

  • org

  • token

  • timeout,

  • verify_ssl

  • ssl_ca_cert

  • cert_file

  • cert_key_file

  • cert_key_password

  • connection_pool_maxsize

  • auth_basic

  • profilers

  • proxy

config.ini example:

[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
connection_pool_maxsize=25
auth_basic=false
profilers=query,operator
proxy=http:proxy.domain.org:8080

[tags]
id = 132-987-655
customer = California Miner
data_center = ${env.data_center}

config.toml example:

[influx2]
    url = "http://localhost:8086"
    token = "my-token"
    org = "my-org"
    timeout = 6000
    connection_pool_maxsize = 25
    auth_basic = false
    profilers="query, operator"
    proxy = "http://proxy.domain.org:8080"

[tags]
    id = "132-987-655"
    customer = "California Miner"
    data_center = "${env.data_center}"

config.json example:

{
    "url": "http://localhost:8086",
    "token": "my-token",
    "org": "my-org",
    "active": true,
    "timeout": 6000,
    "connection_pool_maxsize": 55,
    "auth_basic": false,
    "profilers": "query, operator",
    "tags": {
        "id": "132-987-655",
        "customer": "California Miner",
        "data_center": "${env.data_center}"
    }
}
classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]

Configure client via environment properties.

Parameters:
  • debug – Enable verbose logging of http requests

  • enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.

Key str proxy:

Set this to configure the http proxy to be used (ex. http://localhost:3128)

Key str proxy_headers:

A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.

Key urllib3.util.retry.Retry retries:

Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.

Key ssl.SSLContext ssl_context:

Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.

Supported environment properties:
  • INFLUXDB_V2_URL

  • INFLUXDB_V2_ORG

  • INFLUXDB_V2_TOKEN

  • INFLUXDB_V2_TIMEOUT

  • INFLUXDB_V2_VERIFY_SSL

  • INFLUXDB_V2_SSL_CA_CERT

  • INFLUXDB_V2_CERT_FILE

  • INFLUXDB_V2_CERT_KEY_FILE

  • INFLUXDB_V2_CERT_KEY_PASSWORD

  • INFLUXDB_V2_CONNECTION_POOL_MAXSIZE

  • INFLUXDB_V2_AUTH_BASIC

  • INFLUXDB_V2_PROFILERS

  • INFLUXDB_V2_TAG

async ping() bool[source]

Return the status of InfluxDB instance.

Returns:

The status of InfluxDB.

query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApiAsync[source]

Create an asynchronous Query API instance.

Parameters:

query_options – optional query api configuration

Returns:

Query api instance

async version() str[source]

Return the version of the connected InfluxDB Server.

Returns:

The version of InfluxDB.

write_api(point_settings=<influxdb_client.client.write_api.PointSettings object>) WriteApiAsync[source]

Create an asynchronous Write API instance.

Example:
from influxdb_client_async import InfluxDBClientAsync


# Initialize async/await instance of Write API
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
    write_api = client.write_api()
Parameters:

point_settings – settings to store default tags

Returns:

write api instance

QueryApiAsync

class influxdb_client.client.query_api_async.QueryApiAsync(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]

Asynchronous implementation for ‘/api/v2/query’ endpoint.

Initialize query client.

Parameters:

influxdb_client – influxdb client

async query(query: str, org=None, params: Optional[dict] = None) TableList[source]

Execute asynchronous Flux query and return result as a FluxTable list.

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • params – bind parameters

Returns:

FluxTable list wrapped into TableList

Return type:

TableList

Serialization the query results to flattened list of values via to_values():

from influxdb_client import InfluxDBClient

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:

    # Query: using Table structure
    tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)')

    # Serialize to values
    output = tables.to_values(columns=['location', '_time', '_value'])
    print(output)
[
    ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3],
    ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3],
    ...
]

Serialization the query results to JSON via to_json():

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
    # Query: using Table structure
    tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)')

    # Serialize to JSON
    output = tables.to_json(indent=5)
    print(output)
[
    {
        "_measurement": "mem",
        "_start": "2021-06-23T06:50:11.897825+00:00",
        "_stop": "2021-06-25T06:50:11.897825+00:00",
        "_time": "2020-02-27T16:20:00.897825+00:00",
        "region": "north",
         "_field": "usage",
        "_value": 15
    },
    {
        "_measurement": "mem",
        "_start": "2021-06-23T06:50:11.897825+00:00",
        "_stop": "2021-06-25T06:50:11.897825+00:00",
        "_time": "2020-02-27T16:20:01.897825+00:00",
        "region": "west",
         "_field": "usage",
        "_value": 10
    },
    ...
]
async query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]

Execute asynchronous Flux query and return DataFrame.

Note

If the query returns tables with differing schemas than the client generates a DataFrame for each of them.

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • data_frame_index – the list of columns that are used as DataFrame index

  • params – bind parameters

Returns:

DataFrame or List[DataFrame]

Warning

For the optimal processing of the query results use the pivot() function which align results as a table.

from(bucket:"my-bucket")
    |> range(start: -5m, stop: now())
    |> filter(fn: (r) => r._measurement == "mem")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
For more info see:
async query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]

Execute asynchronous Flux query and return stream of DataFrame as an AsyncGenerator[DataFrame].

Note

If the query returns tables with differing schemas than the client generates a DataFrame for each of them.

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • data_frame_index – the list of columns that are used as DataFrame index

  • params – bind parameters

Returns:

AsyncGenerator[:class:`DataFrame]`

Warning

For the optimal processing of the query results use the pivot() function which align results as a table.

from(bucket:"my-bucket")
    |> range(start: -5m, stop: now())
    |> filter(fn: (r) => r._measurement == "mem")
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
For more info see:
async query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]

Execute asynchronous Flux query and return result as raw unprocessed result as a str.

Parameters:
  • query – a Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • dialect – csv dialect format

  • params – bind parameters

Returns:

str

async query_stream(query: str, org=None, params: Optional[dict] = None) AsyncGenerator[FluxRecord, None][source]

Execute asynchronous Flux query and return stream of FluxRecord as an AsyncGenerator[FluxRecord].

Parameters:
  • query – the Flux query

  • org (str, Organization) – specifies the organization for executing the query; Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • params – bind parameters

Returns:

AsyncGenerator[FluxRecord]

WriteApiAsync

class influxdb_client.client.write_api_async.WriteApiAsync(influxdb_client, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>)[source]

Implementation for ‘/api/v2/write’ endpoint.

Example:
from influxdb_client_async import InfluxDBClientAsync


# Initialize async/await instance of Write API
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
    write_api = client.write_api()

Initialize defaults.

Parameters:
  • influxdb_client – with default settings (organization)

  • point_settings – settings to store default tags.

async write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) bool[source]

Write time-series data into InfluxDB.

Parameters:
  • bucket (str) – specifies the destination bucket for writes (required)

  • org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

  • write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.

  • record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame

Key data_frame_measurement_name:

name of measurement for writing Pandas DataFrame - DataFrame

Key data_frame_tag_columns:

list of DataFrame columns which are tags, rest columns will be fields - DataFrame

Key data_frame_timestamp_column:

name of DataFrame column which contains a timestamp. The column can be defined as a str value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime - DataFrame

Key data_frame_timestamp_timezone:

name of the timezone which is used for timestamp column - DataFrame

Key record_measurement_key:

key of record with specified measurement - dictionary, NamedTuple, dataclass

Key record_measurement_name:

static measurement name - dictionary, NamedTuple, dataclass

Key record_time_key:

key of record with specified timestamp - dictionary, NamedTuple, dataclass

Key record_tag_keys:

list of record keys to use as a tag - dictionary, NamedTuple, dataclass

Key record_field_keys:

list of record keys to use as a field - dictionary, NamedTuple, dataclass

Returns:

True for successfully accepted data, otherwise raise an exception

Example:
# Record as Line Protocol
await write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1")

# Record as Dictionary
dictionary = {
    "measurement": "h2o_feet",
    "tags": {"location": "us-west"},
    "fields": {"level": 125},
    "time": 1
}
await write_api.write("my-bucket", "my-org", dictionary)

# Record as Point
from influxdb_client import Point
point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1)
await write_api.write("my-bucket", "my-org", point)
DataFrame:

If the data_frame_timestamp_column is not specified the index of Pandas DataFrame is used as a timestamp for written data. The index can be PeriodIndex or its must be transformable to datetime by pandas.to_datetime.

If you would like to transform a column to PeriodIndex, you can use something like:

import pandas as pd

# DataFrame
data_frame = ...
# Set column as Index
data_frame.set_index('column_name', inplace=True)
# Transform index to PeriodIndex
data_frame.index = pd.to_datetime(data_frame.index, unit='s')

DeleteApiAsync

class influxdb_client.client.delete_api_async.DeleteApiAsync(influxdb_client)[source]

Async implementation for ‘/api/v2/delete’ endpoint.

Initialize defaults.

async delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) bool[source]

Delete Time series data from InfluxDB.

Parameters:
  • start (str, datetime.datetime) – start time

  • stop (str, datetime.datetime) – stop time

  • predicate (str) – predicate

  • bucket (str) – bucket id or name from which data will be deleted

  • org (str, Organization) – specifies the organization to delete data from. Take the ID, Name or Organization. If not specified the default value from InfluxDBClientAsync.org is used.

Returns:

True for successfully deleted data, otherwise raise an exception