Async API Reference
InfluxDBClientAsync
- class influxdb_client.client.influxdb_client_async.InfluxDBClientAsync(url, token: Optional[str] = None, org: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, **kwargs)[source]
InfluxDBClientAsync is client for InfluxDB v2.
Initialize defaults.
- Parameters:
url – InfluxDB server API url (ex. http://localhost:8086).
token –
token
to authenticate to the InfluxDB 2.xorg – organization name (used as a default in Query, Write and Delete API)
debug – enable verbose logging of http requests
timeout – The maximal number of milliseconds for the whole HTTP request including connection establishment, request sending and response reading. It can also be a
ClientTimeout
which is directly pass toaiohttp
.enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key bool verify_ssl:
Set this to false to skip verifying SSL certificate when calling API from https server.
- Key str ssl_ca_cert:
Set this to customize the certificate file to verify the peer.
- Key str cert_file:
Path to the certificate that will be used for mTLS authentication.
- Key str cert_key_file:
Path to the file contains private key for mTLS certificate.
- Key str cert_key_password:
String or function which returns password for decrypting the mTLS private key.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key int connection_pool_maxsize:
The total number of simultaneous connections. Defaults to “multiprocessing.cpu_count() * 5”.
- Key bool auth_basic:
Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
- Key str username:
username
to authenticate via username and password credentials to the InfluxDB 2.x- Key str password:
password
to authenticate via username and password credentials to the InfluxDB 2.x- Key bool allow_redirects:
If set to
False
, do not follow HTTP redirects.True
by default.- Key int max_redirects:
Maximum number of HTTP redirects to follow.
10
by default.- Key dict client_session_kwargs:
Additional configuration arguments for
ClientSession
- Key type client_session_type:
Type of aiohttp client to use. Useful for third party wrappers like
aiohttp-retry
.ClientSession
by default.- Key list[str] profilers:
list of enabled Flux profilers
- async build() str [source]
Return the build type of the connected InfluxDB Server.
- Returns:
The type of InfluxDB build.
- delete_api() DeleteApiAsync [source]
Get the asynchronous delete metrics API instance.
- Returns:
delete api
- classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]
Configure client via configuration file. The configuration has to be under ‘influx’ section.
- Parameters:
config_file – Path to configuration file
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key config_name:
Name of the configuration section of the configuration file
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
url
org
token
timeout,
verify_ssl
ssl_ca_cert
cert_file
cert_key_file
cert_key_password
connection_pool_maxsize
auth_basic
profilers
proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
- classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]
Configure client via environment properties.
- Parameters:
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
INFLUXDB_V2_URL
INFLUXDB_V2_ORG
INFLUXDB_V2_TOKEN
INFLUXDB_V2_TIMEOUT
INFLUXDB_V2_VERIFY_SSL
INFLUXDB_V2_SSL_CA_CERT
INFLUXDB_V2_CERT_FILE
INFLUXDB_V2_CERT_KEY_FILE
INFLUXDB_V2_CERT_KEY_PASSWORD
INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
INFLUXDB_V2_AUTH_BASIC
INFLUXDB_V2_PROFILERS
INFLUXDB_V2_TAG
- async ping() bool [source]
Return the status of InfluxDB instance.
- Returns:
The status of InfluxDB.
- query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApiAsync [source]
Create an asynchronous Query API instance.
- Parameters:
query_options – optional query api configuration
- Returns:
Query api instance
- async version() str [source]
Return the version of the connected InfluxDB Server.
- Returns:
The version of InfluxDB.
- write_api(point_settings=<influxdb_client.client.write_api.PointSettings object>) WriteApiAsync [source]
Create an asynchronous Write API instance.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
- Parameters:
point_settings – settings to store default tags
- Returns:
write api instance
QueryApiAsync
- class influxdb_client.client.query_api_async.QueryApiAsync(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]
Asynchronous implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
- Parameters:
influxdb_client – influxdb client
- async query(query: str, org=None, params: Optional[dict] = None) TableList [source]
Execute asynchronous Flux query and return result as a
FluxTable
list.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- async query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None, use_extension_dtypes: bool = False)[source]
Execute asynchronous Flux query and return
DataFrame
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
use_extension_dtypes – set to
True
to use panda’s extension data types. Useful for queries withpivot
function. When data has missing values, column data type may change (toobject
orfloat64
). Nullable extension types (Int64
,Float64
,boolean
) supportpanda.NA
value. For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
- Returns:
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- async query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None, use_extension_dtypes: bool = False)[source]
Execute asynchronous Flux query and return stream of
DataFrame
as an AsyncGenerator[DataFrame
].Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
use_extension_dtypes – set to
True
to use panda’s extension data types. Useful for queries withpivot
function. When data has missing values, column data type may change (toobject
orfloat64
). Nullable extension types (Int64
,Float64
,boolean
) supportpanda.NA
value. For more info, see https://pandas.pydata.org/docs/user_guide/missing_data.html.
- Returns:
AsyncGenerator[:class:`DataFrame
]`
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- async query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return result as raw unprocessed result as a str.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.dialect – csv dialect format
params – bind parameters
- Returns:
- async query_stream(query: str, org=None, params: Optional[dict] = None) AsyncGenerator[FluxRecord, None] [source]
Execute asynchronous Flux query and return stream of
FluxRecord
as an AsyncGenerator[FluxRecord
].- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.params – bind parameters
- Returns:
AsyncGenerator[
FluxRecord
]
WriteApiAsync
- class influxdb_client.client.write_api_async.WriteApiAsync(influxdb_client, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>)[source]
Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
Initialize defaults.
- Parameters:
influxdb_client – with default settings (organization)
point_settings – settings to store default tags.
- async write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) bool [source]
Write time-series data into InfluxDB.
- Parameters:
bucket (str) – specifies the destination bucket for writes (required)
org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from
InfluxDBClientAsync.org
is used.write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame
- Key data_frame_measurement_name:
name of measurement for writing Pandas DataFrame -
DataFrame
- Key data_frame_tag_columns:
list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
- Key data_frame_timestamp_column:
name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
- Key data_frame_timestamp_timezone:
name of the timezone which is used for timestamp column -
DataFrame
- Key record_measurement_key:
key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
- Key record_measurement_name:
static measurement name -
dictionary
,NamedTuple
,dataclass
- Key record_time_key:
key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
- Key record_tag_keys:
list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
- Key record_field_keys:
list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
- Returns:
True
for successfully accepted data, otherwise raise an exception
- Example:
# Record as Line Protocol await write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } await write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) await write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
DeleteApiAsync
- class influxdb_client.client.delete_api_async.DeleteApiAsync(influxdb_client)[source]
Async implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
- async delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) bool [source]
Delete Time series data from InfluxDB.
- Parameters:
start (str, datetime.datetime) – start time
stop (str, datetime.datetime) – stop time
predicate (str) – predicate
bucket (str) – bucket id or name from which data will be deleted
org (str, Organization) – specifies the organization to delete data from. Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.
- Returns:
True
for successfully deleted data, otherwise raise an exception