User Guide


from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

bucket = "my-bucket"

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)

write_api.write(bucket=bucket, org="my-org", record=p)

## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')

for table in tables:
    for row in table.records:
        print (row.values)

## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
    for cell in row:
        val_count += 1


The WriteApi supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a InfluxDB Line Protocol, Data Point or Observable stream.

The default instance of ``WriteApi`` use batching.


The batching is configurable by write_options:

Property Description Default Value
batch_size the number of data pointx to collect in a batch 1000
flush_interval the number of milliseconds before the batch is written 1000
jitter_interval the number of milliseconds to increase the batch flush interval by a random amount 0
retry_interval the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify “Retry-After” header. 1000
import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS

_client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,

Write Line Protocol
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
                                            "h2o_feet,location=coyote_creek water_level=3.0 3"])

Write Data Point
_write_client.write("my-bucket", "my-org", Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org", [Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
                                            Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])

Write Observable stream
_data = rx \
    .range(7, 11) \
    .pipe( i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))

_write_client.write("my-bucket", "my-org", _data)

Close client

Asynchronous client

Data are writes in an asynchronous HTTP request.

from influxdb_client  import InfluxDBClient
from influxdb_client.client.write_api import ASYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_client = client.write_api(write_options=ASYNCHRONOUS)



Synchronous client

Data are writes in a synchronous HTTP request.

from influxdb_client  import InfluxDBClient
from influxdb_client .client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org")
write_client = client.write_api(write_options=SYNCHRONOUS)



How to efficiently import large dataset

Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0

from collections import OrderedDict
from csv import DictReader
from datetime import datetime

import rx
from rx import operators as ops

from influxdb_client import InfluxDBClient, Point, WriteOptions

def parse_row(row: OrderedDict):
    """Parse row of CSV file into Point with structure:

        financial-analysis,type=ily close=18.47,high=19.82,low=18.28,open=19.82 1198195200000000000

    CSV format:
        Date,VIX Open,VIX High,VIX Low,VIX Close\n

    :param row: the row of CSV file
    :return: Parsed csv row to [Point]
    return Point("financial-analysis") \
        .tag("type", "vix-daily") \
        .field("open", float(row['VIX Open'])) \
        .field("high", float(row['VIX High'])) \
        .field("low", float(row['VIX Low'])) \
        .field("close", float(row['VIX Close'])) \
        .time(datetime.strptime(row['Date'], '%Y-%m-%d'))

Converts vix-daily.csv into sequence of datad point
data = rx \
    .from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
    .pipe( row: parse_row(row)))

client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=True)

Create client that writes data in batches with 500 items.
write_api = client.write_api(write_options=WriteOptions(batch_size=500, jitter_interval=1_000))

Write data into InfluxDB
write_api.write(org="my-org", bucket="my-bucket", record=data)

Querying max value of CBOE Volatility Index
query = 'from(bucket:"my-bucket")' \
        ' |> range(start: 0, stop: now())' \
        ' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
        ' |> max()'
result = client.query_api().query(org="my-org", query=query)

Processing results
print("=== results ===")
for table in result:
    for record in table.records:
        print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))

Close client

Gzip support

InfluxDBClient does not enable gzip compression for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

from influxdb_client import InfluxDBClient

_db_client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", enable_gzip=True)


For debug purpose you can enable verbose logging of http requests. Both request header and body will be logged to standard output.

_client = InfluxDBClient(url="http://localhost:9999", token="my-token", debug=True, org="my-org")