InfluxDB 2.0 python client

Contents:

CircleCI codecov CI status PyPI package Anaconda.org package Supported Python versions Documentation status Slack Status

This repository contains the Python client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.

For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.

The API of the influxdb-client-python is not the backwards-compatible with the old one - influxdb-python.

Documentation

This section contains links to the client library documentation.

InfluxDB 2.0 client features

Installation

InfluxDB python library uses RxPY - The Reactive Extensions for Python (RxPY).

Python 3.7 or later is required.

:warning:

It is recommended to use ciso8601 with client for parsing dates. ciso8601 is much faster than built-in Python datetime. Since it’s written as a C module the best way is build it from sources:

Windows:

You have to install Visual C++ Build Tools 2015 to build ciso8601 by pip.

conda:

Install from sources: conda install -c conda-forge/label/cf202003 ciso8601.

pip install

The python package is hosted on PyPI, you can install latest version directly:

pip install 'influxdb-client[ciso]'

Then import the package:

import influxdb_client

If your application uses async/await in Python you can install with the async extra:

$ pip install influxdb-client[async]

For more info see How to use Asyncio.

Setuptools

Install via Setuptools.

python setup.py install --user

(or sudo python setup.py install to install the package for all users)

Getting Started

Please follow the Installation and then run the following:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)

write_api.write(bucket=bucket, record=p)

## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for table in tables:
    print(table)
    for row in table.records:
        print (row.values)


## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
    for cell in row:
        val_count += 1

Client configuration

Via File

A client can be configured via *.ini file in segment influx2.

The following options are supported:

  • url - the url to connect to InfluxDB

  • org - default destination organization for writes and queries

  • token - the token to use for the authorization

  • timeout - socket timeout in ms (default value is 10000)

  • verify_ssl - set this to false to skip verifying SSL certificate when calling API from https server

  • ssl_ca_cert - set this to customize the certificate file to verify the peer

  • cert_file - path to the certificate that will be used for mTLS authentication

  • cert_key_file - path to the file contains private key for mTLS certificate

  • cert_key_password - string or function which returns password for decrypting the mTLS private key

  • connection_pool_maxsize - set the number of connections to save that can be reused by urllib3

  • auth_basic - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)

  • profilers - set the list of enabled Flux profilers

self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
verify_ssl=False

Via Environment Properties

A client can be configured via environment properties.

Supported properties are:

  • INFLUXDB_V2_URL - the url to connect to InfluxDB

  • INFLUXDB_V2_ORG - default destination organization for writes and queries

  • INFLUXDB_V2_TOKEN - the token to use for the authorization

  • INFLUXDB_V2_TIMEOUT - socket timeout in ms (default value is 10000)

  • INFLUXDB_V2_VERIFY_SSL - set this to false to skip verifying SSL certificate when calling API from https server

  • INFLUXDB_V2_SSL_CA_CERT - set this to customize the certificate file to verify the peer

  • INFLUXDB_V2_CERT_FILE - path to the certificate that will be used for mTLS authentication

  • INFLUXDB_V2_CERT_KEY_FILE - path to the file contains private key for mTLS certificate

  • INFLUXDB_V2_CERT_KEY_PASSWORD - string or function which returns password for decrypting the mTLS private key

  • INFLUXDB_V2_CONNECTION_POOL_MAXSIZE - set the number of connections to save that can be reused by urllib3

  • INFLUXDB_V2_AUTH_BASIC - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)

  • INFLUXDB_V2_PROFILERS - set the list of enabled Flux profilers

self.client = InfluxDBClient.from_env_properties()

Profile query

The Flux Profiler package provides performance profiling tools for Flux queries and operations.

You can enable printing profiler information of the Flux query in client library by:

  • set QueryOptions.profilers in QueryApi,

  • set INFLUXDB_V2_PROFILERS environment variable,

  • set profilers option in configuration file.

When the profiler is enabled, the result of flux query contains additional tables “profiler/”. In order to have consistent behaviour with enabled/disabled profiler, FluxCSVParser excludes “profiler/” measurements from result.

Example how to enable profilers using API:

q = '''
    from(bucket: stringParam)
      |> range(start: -5m, stop: now())
      |> filter(fn: (r) => r._measurement == "mem")
      |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
      |> aggregateWindow(every: 1m, fn: mean)
      |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
    "stringParam": "my-bucket",
}

query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)

Example of a profiler output:

===============
Profiler: query
===============

from(bucket: stringParam)
  |> range(start: -5m, stop: now())
  |> filter(fn: (r) => r._measurement == "mem")
  |> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
  |> aggregateWindow(every: 1m, fn: mean)
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

========================
Profiler: profiler/query
========================
result              : _profiler
table               : 0
_measurement        : profiler/query
TotalDuration       : 8924700
CompileDuration     : 350900
QueueDuration       : 33800
PlanDuration        : 0
RequeueDuration     : 0
ExecuteDuration     : 8486500
Concurrency         : 0
MaxAllocated        : 2072
TotalAllocated      : 0
flux/query-plan     :

digraph {
  ReadWindowAggregateByTime11
  // every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
  pivot8
  generated_yield

  ReadWindowAggregateByTime11 -> pivot8
  pivot8 -> generated_yield
}


influxdb/scanned-bytes: 0
influxdb/scanned-values: 0

===========================
Profiler: profiler/operator
===========================
result              : _profiler
table               : 1
_measurement        : profiler/operator
Type                : *universe.pivotTransformation
Label               : pivot8
Count               : 3
MinDuration         : 32600
MaxDuration         : 126200
DurationSum         : 193400
MeanDuration        : 64466.666666666664

===========================
Profiler: profiler/operator
===========================
result              : _profiler
table               : 1
_measurement        : profiler/operator
Type                : *influxdb.readWindowAggregateSource
Label               : ReadWindowAggregateByTime11
Count               : 1
MinDuration         : 940500
MaxDuration         : 940500
DurationSum         : 940500
MeanDuration        : 940500.0

You can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.

Example how to use profilers with callback:

class ProfilersCallback(object):
   def __init__(self):
       self.records = []

   def __call__(self, flux_record):
       self.records.append(flux_record.values)

callback = ProfilersCallback()

query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for profiler in callback.records:
   print(f'Custom processing of profiler result: {profiler}')

Example output of this callback:

Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n  ReadRange2\r\n  generated_yield\r\n\r\n  ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}

Indices and tables