Async API Reference
InfluxDBClientAsync
- class influxdb_client.client.influxdb_client_async.InfluxDBClientAsync(url, token: Optional[str] = None, org: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, **kwargs)[source]
InfluxDBClientAsync is client for InfluxDB v2.
Initialize defaults.
- Parameters:
url – InfluxDB server API url (ex. http://localhost:8086).
token –
token
to authenticate to the InfluxDB 2.xorg – organization name (used as a default in Query, Write and Delete API)
debug – enable verbose logging of http requests
timeout – The maximal number of milliseconds for the whole HTTP request including connection establishment, request sending and response reading. It can also be a
ClientTimeout
which is directly pass toaiohttp
.enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key bool verify_ssl:
Set this to false to skip verifying SSL certificate when calling API from https server.
- Key str ssl_ca_cert:
Set this to customize the certificate file to verify the peer.
- Key str cert_file:
Path to the certificate that will be used for mTLS authentication.
- Key str cert_key_file:
Path to the file contains private key for mTLS certificate.
- Key str cert_key_password:
String or function which returns password for decrypting the mTLS private key.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key int connection_pool_maxsize:
The total number of simultaneous connections. Defaults to “multiprocessing.cpu_count() * 5”.
- Key bool auth_basic:
Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
- Key str username:
username
to authenticate via username and password credentials to the InfluxDB 2.x- Key str password:
password
to authenticate via username and password credentials to the InfluxDB 2.x- Key bool allow_redirects:
If set to
False
, do not follow HTTP redirects.True
by default.- Key int max_redirects:
Maximum number of HTTP redirects to follow.
10
by default.- Key dict client_session_kwargs:
Additional configuration arguments for
ClientSession
- Key type client_session_type:
Type of aiohttp client to use. Useful for third party wrappers like
aiohttp-retry
.ClientSession
by default.- Key list[str] profilers:
list of enabled Flux profilers
- async build() str [source]
Return the build type of the connected InfluxDB Server.
- Returns:
The type of InfluxDB build.
- delete_api() DeleteApiAsync [source]
Get the asynchronous delete metrics API instance.
- Returns:
delete api
- classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]
Configure client via configuration file. The configuration has to be under ‘influx’ section.
- Parameters:
config_file – Path to configuration file
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key config_name:
Name of the configuration section of the configuration file
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
url
org
token
timeout,
verify_ssl
ssl_ca_cert
cert_file
cert_key_file
cert_key_password
connection_pool_maxsize
auth_basic
profilers
proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
- classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]
Configure client via environment properties.
- Parameters:
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
INFLUXDB_V2_URL
INFLUXDB_V2_ORG
INFLUXDB_V2_TOKEN
INFLUXDB_V2_TIMEOUT
INFLUXDB_V2_VERIFY_SSL
INFLUXDB_V2_SSL_CA_CERT
INFLUXDB_V2_CERT_FILE
INFLUXDB_V2_CERT_KEY_FILE
INFLUXDB_V2_CERT_KEY_PASSWORD
INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
INFLUXDB_V2_AUTH_BASIC
INFLUXDB_V2_PROFILERS
INFLUXDB_V2_TAG
- async ping() bool [source]
Return the status of InfluxDB instance.
- Returns:
The status of InfluxDB.
- query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApiAsync [source]
Create an asynchronous Query API instance.
- Parameters:
query_options – optional query api configuration
- Returns:
Query api instance
- async version() str [source]
Return the version of the connected InfluxDB Server.
- Returns:
The version of InfluxDB.
- write_api(point_settings=<influxdb_client.client.write_api.PointSettings object>) WriteApiAsync [source]
Create an asynchronous Write API instance.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
- Parameters:
point_settings – settings to store default tags
- Returns:
write api instance
QueryApiAsync
- class influxdb_client.client.query_api_async.QueryApiAsync(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]
Asynchronous implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
- Parameters:
influxdb_client – influxdb client
- async query(query: str, org=None, params: Optional[dict] = None) TableList [source]
Execute asynchronous Flux query and return result as a
FluxTable
list.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- async query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return
DataFrame
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- async query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return stream of
DataFrame
as an AsyncGenerator[DataFrame
].Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
AsyncGenerator[:class:`DataFrame
]`
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- async query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return result as raw unprocessed result as a str.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.dialect – csv dialect format
params – bind parameters
- Returns:
- async query_stream(query: str, org=None, params: Optional[dict] = None) AsyncGenerator[FluxRecord, None] [source]
Execute asynchronous Flux query and return stream of
FluxRecord
as an AsyncGenerator[FluxRecord
].- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.params – bind parameters
- Returns:
AsyncGenerator[
FluxRecord
]
WriteApiAsync
- class influxdb_client.client.write_api_async.WriteApiAsync(influxdb_client, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>)[source]
Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
Initialize defaults.
- Parameters:
influxdb_client – with default settings (organization)
point_settings – settings to store default tags.
- async write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) bool [source]
Write time-series data into InfluxDB.
- Parameters:
bucket (str) – specifies the destination bucket for writes (required)
org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from
InfluxDBClientAsync.org
is used.write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame
- Key data_frame_measurement_name:
name of measurement for writing Pandas DataFrame -
DataFrame
- Key data_frame_tag_columns:
list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
- Key data_frame_timestamp_column:
name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
- Key data_frame_timestamp_timezone:
name of the timezone which is used for timestamp column -
DataFrame
- Key record_measurement_key:
key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
- Key record_measurement_name:
static measurement name -
dictionary
,NamedTuple
,dataclass
- Key record_time_key:
key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
- Key record_tag_keys:
list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
- Key record_field_keys:
list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
- Returns:
True
for successfully accepted data, otherwise raise an exception
- Example:
# Record as Line Protocol await write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } await write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) await write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
DeleteApiAsync
- class influxdb_client.client.delete_api_async.DeleteApiAsync(influxdb_client)[source]
Async implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
- async delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) bool [source]
Delete Time series data from InfluxDB.
- Parameters:
start (str, datetime.datetime) – start time
stop (str, datetime.datetime) – stop time
predicate (str) – predicate
bucket (str) – bucket id or name from which data will be deleted
org (str, Organization) – specifies the organization to delete data from. Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.
- Returns:
True
for successfully deleted data, otherwise raise an exception