InfluxDB 2.0 python client
- User Guide
- API Reference
- Async API Reference
- Migration Guide
- Development
This repository contains the Python client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.
For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.
The API of the influxdb-client-python is not the backwards-compatible with the old one - influxdb-python.
Documentation
This section contains links to the client library documentation.
InfluxDB 2.0 client features
Querying data
using the Flux language
into csv, raw data, flux_table structure, Pandas DataFrame
Writing data using
RxPY Observable
InfluxDB 2.0 API client for management
the client is generated from the swagger by using the openapi-generator
organizations & users management
buckets management
tasks management
authorizations
health check
…
Examples
Installation
InfluxDB python library uses RxPY - The Reactive Extensions for Python (RxPY).
Python 3.7 or later is required.
:warning:
It is recommended to use
ciso8601
with client for parsing dates.ciso8601
is much faster than built-in Python datetime. Since it’s written as aC
module the best way is build it from sources:
Windows:
You have to install Visual C++ Build Tools 2015 to build ciso8601
by pip
.
conda:
Install from sources: conda install -c conda-forge/label/cf202003 ciso8601
.
pip install
The python package is hosted on PyPI, you can install latest version directly:
pip install 'influxdb-client[ciso]'
Then import the package:
import influxdb_client
If your application uses async/await in Python you can install with the async
extra:
$ pip install influxdb-client[async]
For more info see How to use Asyncio.
Setuptools
Install via Setuptools.
python setup.py install --user
(or sudo python setup.py install
to install the package for all users)
Getting Started
Please follow the Installation and then run the following:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "my-bucket"
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, record=p)
## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for row in table.records:
print (row.values)
## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
for cell in row:
val_count += 1
Client configuration
Via File
A client can be configured via *.ini
file in segment influx2
.
The following options are supported:
url
- the url to connect to InfluxDBorg
- default destination organization for writes and queriestoken
- the token to use for the authorizationtimeout
- socket timeout in ms (default value is 10000)verify_ssl
- set this to false to skip verifying SSL certificate when calling API from https serverssl_ca_cert
- set this to customize the certificate file to verify the peercert_file
- path to the certificate that will be used for mTLS authenticationcert_key_file
- path to the file contains private key for mTLS certificatecert_key_password
- string or function which returns password for decrypting the mTLS private keyconnection_pool_maxsize
- set the number of connections to save that can be reused by urllib3auth_basic
- enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)profilers
- set the list of enabled Flux profilers
self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
verify_ssl=False
Via Environment Properties
A client can be configured via environment properties.
Supported properties are:
INFLUXDB_V2_URL
- the url to connect to InfluxDBINFLUXDB_V2_ORG
- default destination organization for writes and queriesINFLUXDB_V2_TOKEN
- the token to use for the authorizationINFLUXDB_V2_TIMEOUT
- socket timeout in ms (default value is 10000)INFLUXDB_V2_VERIFY_SSL
- set this to false to skip verifying SSL certificate when calling API from https serverINFLUXDB_V2_SSL_CA_CERT
- set this to customize the certificate file to verify the peerINFLUXDB_V2_CERT_FILE
- path to the certificate that will be used for mTLS authenticationINFLUXDB_V2_CERT_KEY_FILE
- path to the file contains private key for mTLS certificateINFLUXDB_V2_CERT_KEY_PASSWORD
- string or function which returns password for decrypting the mTLS private keyINFLUXDB_V2_CONNECTION_POOL_MAXSIZE
- set the number of connections to save that can be reused by urllib3INFLUXDB_V2_AUTH_BASIC
- enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)INFLUXDB_V2_PROFILERS
- set the list of enabled Flux profilers
self.client = InfluxDBClient.from_env_properties()
Profile query
The Flux Profiler package provides performance profiling tools for Flux queries and operations.
You can enable printing profiler information of the Flux query in client library by:
set QueryOptions.profilers in QueryApi,
set
INFLUXDB_V2_PROFILERS
environment variable,set
profilers
option in configuration file.
When the profiler is enabled, the result of flux query contains additional tables “profiler/”. In order to have consistent behaviour with enabled/disabled profiler, FluxCSVParser
excludes “profiler/” measurements from result.
Example how to enable profilers using API:
q = '''
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
"stringParam": "my-bucket",
}
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)
Example of a profiler output:
===============
Profiler: query
===============
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
========================
Profiler: profiler/query
========================
result : _profiler
table : 0
_measurement : profiler/query
TotalDuration : 8924700
CompileDuration : 350900
QueueDuration : 33800
PlanDuration : 0
RequeueDuration : 0
ExecuteDuration : 8486500
Concurrency : 0
MaxAllocated : 2072
TotalAllocated : 0
flux/query-plan :
digraph {
ReadWindowAggregateByTime11
// every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
pivot8
generated_yield
ReadWindowAggregateByTime11 -> pivot8
pivot8 -> generated_yield
}
influxdb/scanned-bytes: 0
influxdb/scanned-values: 0
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *universe.pivotTransformation
Label : pivot8
Count : 3
MinDuration : 32600
MaxDuration : 126200
DurationSum : 193400
MeanDuration : 64466.666666666664
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *influxdb.readWindowAggregateSource
Label : ReadWindowAggregateByTime11
Count : 1
MinDuration : 940500
MaxDuration : 940500
DurationSum : 940500
MeanDuration : 940500.0
You can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.
Example how to use profilers with callback:
class ProfilersCallback(object):
def __init__(self):
self.records = []
def __call__(self, flux_record):
self.records.append(flux_record.values)
callback = ProfilersCallback()
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
Example output of this callback:
Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}