InfluxDB 2.0 python client
User Guide
Query
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "my-bucket"
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, record=p)
## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for row in table.records:
print (row.values)
## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
for cell in row:
val_count += 1
Write
The WriteApi supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a InfluxDB Line Protocol, Data Point or Observable stream.
Warning
The WriteApi
in batching mode (default mode) is suppose to run as a singleton.
To flush all your data you should wrap the execution using with client.write_api(...) as write_api:
statement
or call write_api.close()
at the end of your script.
The default instance of WriteApi use batching.
The data could be written as
string
orbytes
that is formatted as a InfluxDB’s line protocolData Point structure
Dictionary style mapping with keys:
measurement
,tags
,fields
andtime
or custom structureList of above items
A
batching
type of write also supports anObservable
that produce one of an above item
You can find write examples at GitHub: influxdb-client-python/examples.
Batching
The batching is configurable by write_options
:
Property |
Description |
Default Value |
---|---|---|
batch_size |
the number of data point to collect in a batch |
|
flush_interval |
the number of milliseconds before the batch is written |
|
jitter_interval |
the number of milliseconds to increase the batch flush interval by a random amount |
|
retry_interval |
the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify “Retry-After” header. |
|
max_retry_time |
maximum total retry timeout in milliseconds. |
|
max_retries |
the number of max retries when write fails |
|
max_retry_delay |
the maximum delay between each retry attempt in milliseconds |
|
max_close_wait |
the maximum amount of time to wait for batches to flush when .close() is called |
|
exponential_base |
the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval |
|
from datetime import datetime, timedelta
import pandas as pd
import reactivex as rx
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, Point, WriteOptions
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as _client:
with _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
max_close_wait=300_000,
exponential_base=2)) as _write_client:
"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
"h2o_feet,location=coyote_creek water_level=3.0 3"])
"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 2.0}, "time": 2},
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 3.0}, "time": 3}])
"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org",
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org",
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])
"""
Write Observable stream
"""
_data = rx \
.range(7, 11) \
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))
_write_client.write("my-bucket", "my-org", _data)
"""
Write Pandas DataFrame
"""
_now = datetime.utcnow()
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[_now, _now + timedelta(hours=1)],
columns=["location", "water_level"])
_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
Synchronous client
Data are writes in a synchronous HTTP request.
from influxdb_client import InfluxDBClient, Point
from influxdb_client .client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])
client.close()
Delete data
The delete_api.py supports deletes points from an InfluxDB bucket.
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="my-token")
delete_api = client.delete_api()
"""
Delete Data
"""
start = "1970-01-01T00:00:00Z"
stop = "2021-02-01T00:00:00Z"
delete_api.delete(start, stop, '_measurement="my_measurement"', bucket='my-bucket', org='my-org')
"""
Close client
"""
client.close()
Pandas DataFrame
Note
For DataFrame querying you should install Pandas dependency via pip install 'influxdb-client[extra]'
.
Note
Note that if a query returns more then one table then the client generates a DataFrame
for each of them.
The client
is able to retrieve data in Pandas DataFrame format thought query_data_frame
:
from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])
"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())
"""
Close client
"""
client.close()
Output:
result table location temperature
0 _result 0 New York 24.3
1 _result 1 Prague 25.3
How to use Asyncio
Starting from version 1.27.0 for Python 3.7+ the influxdb-client
package supports async/await
based on
asyncio, aiohttp and aiocsv.
You can install aiohttp
and aiocsv
directly:
$ python -m pip install influxdb-client aiohttp aiocsv
or use the [async]
extra:
$ python -m pip install influxdb-client[async]
Warning
The InfluxDBClientAsync
should be initialised inside async coroutine
otherwise there can be unexpected behaviour.
For more info see: Why is creating a ClientSession outside of an event loop dangerous?.
Async APIs
All async APIs are available via influxdb_client.client.influxdb_client_async.InfluxDBClientAsync
.
The async
version of the client supports following asynchronous APIs:
influxdb_client.client.write_api_async.WriteApiAsync
influxdb_client.client.query_api_async.QueryApiAsync
influxdb_client.client.delete_api_async.DeleteApiAsync
Management services into
influxdb_client.service
supports async operation
and also check to readiness of the InfluxDB via /ping
endpoint:
import asyncio from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async def main(): async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: ready = await client.ping() print(f"InfluxDB: {ready}") if __name__ == "__main__": asyncio.run(main())
Async Write API
The influxdb_client.client.write_api_async.WriteApiAsync
supports ingesting data as:
string
orbytes
that is formatted as a InfluxDB’s line protocolData Point structure
Dictionary style mapping with keys:
measurement
,tags
,fields
andtime
or custom structureList of above items
import asyncio from influxdb_client import Point from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async def main(): async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api() _point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3) _point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3) successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2]) print(f" > successfully: {successfully}") if __name__ == "__main__": asyncio.run(main())
Async Query API
The influxdb_client.client.query_api_async.QueryApiAsync
supports retrieve data as:
List of
influxdb_client.client.flux_table.FluxTable
Stream of
influxdb_client.client.flux_table.FluxRecord
viatyping.AsyncGenerator
Stream of Pandas DataFrame via
typing.AsyncGenerator
Raw
str
output
import asyncio from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async def main(): async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Stream of FluxRecords query_api = client.query_api() records = await query_api.query_stream('from(bucket:"my-bucket") ' '|> range(start: -10m) ' '|> filter(fn: (r) => r["_measurement"] == "async_m")') async for record in records: print(record) if __name__ == "__main__": asyncio.run(main())
Async Delete API
import asyncio from datetime import datetime from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async def main(): async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: start = datetime.utcfromtimestamp(0) stop = datetime.now() # Delete data with location = 'Prague' successfully = await client.delete_api().delete(start=start, stop=stop, bucket="my-bucket", predicate="location = \"Prague\"") print(f" > successfully: {successfully}") if __name__ == "__main__": asyncio.run(main())
Management API
import asyncio from influxdb_client import OrganizationsService from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async def main(): async with InfluxDBClientAsync(url='http://localhost:8086', token='my-token', org='my-org') as client: # Initialize async OrganizationsService organizations_service = OrganizationsService(api_client=client.api_client) # Find organization with name 'my-org' organizations = await organizations_service.get_orgs(org='my-org') for organization in organizations.orgs: print(f'name: {organization.name}, id: {organization.id}') if __name__ == "__main__": asyncio.run(main())
Proxy and redirects
You can configure the client to tunnel requests through an HTTP proxy. The following proxy options are supported:
proxy
- Set this to configure the http proxy to be used, ex.http://localhost:3128
proxy_headers
- A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async with InfluxDBClientAsync(url="http://localhost:8086",
token="my-token",
org="my-org",
proxy="http://localhost:3128") as client:
Note
If your proxy notify the client with permanent redirect (HTTP 301
) to different host.
The client removes Authorization
header, because otherwise the contents of Authorization
is sent to third parties
which is a security vulnerability.
Client automatically follows HTTP redirects. The default redirect policy is to follow up to 10
consecutive requests. The redirects can be configured via:
allow_redirects
- If set toFalse
, do not follow HTTP redirects.True
by default.max_redirects
- Maximum number of HTTP redirects to follow.10
by default.
Gzip support
InfluxDBClient
does not enable gzip compression for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:
from influxdb_client import InfluxDBClient
_db_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", enable_gzip=True)
Proxy configuration
You can configure the client to tunnel requests through an HTTP proxy. The following proxy options are supported:
proxy
- Set this to configure the http proxy to be used, ex.http://localhost:3128
proxy_headers
- A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086",
token="my-token",
org="my-org",
proxy="http://localhost:3128") as client:
Note
If your proxy notify the client with permanent redirect (HTTP 301
) to different host.
The client removes Authorization
header, because otherwise the contents of Authorization
is sent to third parties
which is a security vulnerability.
You can change this behaviour by:
from urllib3 import Retry
Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT = frozenset()
Retry.DEFAULT.remove_headers_on_redirect = Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT
Authentication
InfluxDBClient
supports three options how to authorize a connection:
Token
Username & Password
HTTP Basic
Token
Use the token
to authenticate to the InfluxDB API. In your API requests, an Authorization header will be send.
The header value, provide the word Token followed by a space and an InfluxDB API token. The word token` is case-sensitive.
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", token="my-token") as client
Note
Note that this is a preferred way how to authenticate to InfluxDB API.
Username & Password
Authenticates via username and password credentials. If successful, creates a new session for the user.
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", username="my-user", password="my-password") as client
Warning
The username/password
auth is based on the HTTP “Basic” authentication.
The authorization expires when the time-to-live (TTL)
(default 60 minutes) is reached and client produces unauthorized exception
.
HTTP Basic
Use this to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication.
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", auth_basic=True, token="my-proxy-secret") as client
Warning
Don’t use this when directly talking to InfluxDB 2.
Nanosecond precision
The Python’s datetime doesn’t support precision with nanoseconds so the library during writes and queries ignores everything after microseconds.
If you would like to use datetime
with nanosecond precision you should use
pandas.Timestamp
that is replacement for python datetime.datetime
object and also you should set a proper DateTimeHelper
to the client.
sources - nanosecond_precision.py
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
from influxdb_client.client.write_api import SYNCHRONOUS
"""
Set PandasDate helper which supports nanoseconds.
"""
import influxdb_client.client.util.date_utils as date_utils
date_utils.date_helper = PandasDateTimeHelper()
"""
Prepare client.
"""
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
point = Point("h2o_feet") \
.field("water_level", 10) \
.tag("location", "pacific") \
.time('1996-02-25T21:20:00.001001231Z')
print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
print()
write_api.write(bucket="my-bucket", record=point)
"""
Query: using Stream
"""
query = '''
from(bucket:"my-bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r._measurement == "h2o_feet")
'''
records = query_api.query_stream(query)
for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')
"""
Close client
"""
client.close()
Handling Errors
Errors happen and it’s important that your code is prepared for them. All client related exceptions are delivered from
InfluxDBError
. If the exception cannot be recovered in the client it is returned to the application.
These exceptions are left for the developer to handle.
Almost all APIs directly return unrecoverable exceptions to be handled this way:
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
try:
client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86")
except InfluxDBError as e:
if e.response.status == 401:
raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e
raise
The only exception is batching WriteAPI
(for more info see Batching). where you need to register custom callbacks to handle batch events.
This is because this API runs in the background
in a separate
thread and isn’t possible to directly
return underlying exceptions.
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
class BatchingCallback(object):
def success(self, conf: (str, str, str), data: str):
print(f"Written batch: {conf}, data: {data}")
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
callback = BatchingCallback()
with client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
pass
HTTP Retry Strategy
By default the client uses a retry strategy only for batching writes (for more info see Batching).
For other HTTP requests there is no one retry strategy, but it could be configured by retries
parameter of InfluxDBClient
.
For more info about how configure HTTP retry see details in urllib3 documentation.
from urllib3 import Retry
from influxdb_client import InfluxDBClient
retries = Retry(connect=5, read=2, redirect=5)
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", retries=retries)
Logging
The client uses Python’s logging facility for logging the library activity. The following logger categories are exposed:
influxdb_client.client.influxdb_client
influxdb_client.client.influxdb_client_async
influxdb_client.client.write_api
influxdb_client.client.write_api_async
influxdb_client.client.write.retry
influxdb_client.client.write.dataframe_serializer
influxdb_client.client.util.multiprocessing_helper
influxdb_client.client.http
influxdb_client.client.exceptions
The default logging level is warning without configured logger output. You can use the standard logger interface to change the log level and handler:
import logging
import sys
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
for _, logger in client.conf.loggers.items():
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
Debugging
For debug purpose you can enable verbose logging of HTTP requests and set the debug
level to all client’s logger categories by:
client = InfluxDBClient(url="http://localhost:8086", token="my-token", debug=True)
Note
Both HTTP request headers and body will be logged to standard output.
Examples
How to efficiently import large dataset
The following example shows how to import dataset with dozen megabytes. If you would like to import gigabytes of data then use our multiprocessing example: import_data_set_multiprocessing.py for use a full capability of your hardware.
sources - import_data_set.py
"""
Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0
https://datahub.io/core/finance-vix#data
"""
from collections import OrderedDict
from csv import DictReader
import reactivex as rx
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, Point, WriteOptions
def parse_row(row: OrderedDict):
"""Parse row of CSV file into Point with structure:
financial-analysis,type=ily close=18.47,high=19.82,low=18.28,open=19.82 1198195200000000000
CSV format:
Date,VIX Open,VIX High,VIX Low,VIX Close\n
2004-01-02,17.96,18.68,17.54,18.22\n
2004-01-05,18.45,18.49,17.44,17.49\n
2004-01-06,17.66,17.67,16.19,16.73\n
2004-01-07,16.72,16.75,15.5,15.5\n
2004-01-08,15.42,15.68,15.32,15.61\n
2004-01-09,16.15,16.88,15.57,16.75\n
...
:param row: the row of CSV file
:return: Parsed csv row to [Point]
"""
"""
For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:
"""
# from datetime import timezone
# import ciso8601
# from influxdb_client.client.write.point import EPOCH
#
# time = (ciso8601.parse_datetime(row["Date"]).replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1e9
# return f"financial-analysis,type=vix-daily" \
# f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \
# f" {int(time)}"
return Point("financial-analysis") \
.tag("type", "vix-daily") \
.field("open", float(row['VIX Open'])) \
.field("high", float(row['VIX High'])) \
.field("low", float(row['VIX Low'])) \
.field("close", float(row['VIX Close'])) \
.time(row['Date'])
"""
Converts vix-daily.csv into sequence of datad point
"""
data = rx \
.from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
.pipe(ops.map(lambda row: parse_row(row)))
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
"""
Create client that writes data in batches with 50_000 items.
"""
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))
"""
Write data into InfluxDB
"""
write_api.write(bucket="my-bucket", record=data)
write_api.close()
"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
' |> range(start: 0, stop: now())' \
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
' |> max()'
result = client.query_api().query(query=query)
"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
for record in table.records:
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
"""
Close client
"""
client.close()
Efficiency write data from IOT sensor
sources - iot_sensor.py
"""
Efficiency write data from IOT sensor - write changed temperature every minute
"""
import atexit
import platform
from datetime import timedelta
import psutil as psutil
import reactivex as rx
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, WriteApi, WriteOptions
def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
"""Close clients after terminate a script.
:param db_client: InfluxDB client
:param write_api: WriteApi
:return: nothing
"""
write_api.close()
db_client.close()
def sensor_temperature():
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
:return: actual CPU temperature
"""
os_name = platform.system()
if os_name == 'Darwin':
from subprocess import check_output
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
import re
return re.findall(r'\d+', str(output))[0]
else:
return psutil.sensors_temperatures()["coretemp"][0]
def line_protocol(temperature):
"""Create a InfluxDB line protocol with structure:
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
:param temperature: the sensor temperature
:return: Line protocol to write into InfluxDB
"""
import socket
return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)
"""
Read temperature every minute; distinct_until_changed - produce only if temperature change
"""
data = rx\
.interval(period=timedelta(seconds=60))\
.pipe(ops.map(lambda t: sensor_temperature()),
ops.distinct_until_changed(),
ops.map(lambda temperature: line_protocol(temperature)))
_db_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
"""
Create client that writes data into InfluxDB
"""
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
_write_api.write(bucket="my-bucket", record=data)
"""
Call after terminate a script
"""
atexit.register(on_exit, _db_client, _write_api)
input()
Connect to InfluxDB Cloud
The following example demonstrate a simplest way how to write and query date with the InfluxDB Cloud.
At first point you should create an authentication token as is described here.
After that you should configure properties: influx_cloud_url
, influx_cloud_token
, bucket
and org
in a influx_cloud.py
example.
The last step is run a python script via: python3 influx_cloud.py
.
sources - influx_cloud.py
"""
Connect to InfluxDB 2.0 - write data and query them
"""
from datetime import datetime
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
"""
Configure credentials
"""
influx_cloud_url = 'https://us-west-2-1.aws.cloud2.influxdata.com'
influx_cloud_token = '...'
bucket = '...'
org = '...'
client = InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token)
try:
kind = 'temperature'
host = 'host1'
device = 'opt-123'
"""
Write data by Point structure
"""
point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.utcnow())
print(f'Writing to InfluxDB cloud: {point.to_line_protocol()} ...')
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=bucket, org=org, record=point)
print()
print('success')
print()
print()
"""
Query written data
"""
query = f'from(bucket: "{bucket}") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "{kind}")'
print(f'Querying from InfluxDB cloud: "{query}" ...')
print()
query_api = client.query_api()
tables = query_api.query(query=query, org=org)
for table in tables:
for row in table.records:
print(f'{row.values["_time"]}: host={row.values["host"]},device={row.values["device"]} '
f'{row.values["_value"]} °C')
print()
print('success')
except Exception as e:
print(e)
finally:
client.close()
How to use Jupyter + Pandas + InfluxDB 2
The first example shows how to use client capabilities to predict stock price via Keras, TensorFlow, sklearn:
The example is taken from Kaggle.
sources - stock-predictions.ipynb

Result:

The second example shows how to use client capabilities to realtime visualization via hvPlot, Streamz, RxPY:
sources - realtime-stream.ipynb

Other examples
You can find all examples at GitHub: influxdb-client-python/examples.
API Reference
InfluxDBClient
- class influxdb_client.InfluxDBClient(url, token: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, org: Optional[str] = None, default_tags: Optional[dict] = None, **kwargs)[source]
InfluxDBClient is client for InfluxDB v2.
Initialize defaults.
- Parameters:
url – InfluxDB server API url (ex. http://localhost:8086).
token –
token
to authenticate to the InfluxDB APIdebug – enable verbose logging of http requests
timeout – HTTP client timeout setting for a request specified in milliseconds. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts.
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
org – organization name (used as a default in Query, Write and Delete API)
- Key bool verify_ssl:
Set this to false to skip verifying SSL certificate when calling API from https server.
- Key str ssl_ca_cert:
Set this to customize the certificate file to verify the peer.
- Key str cert_file:
Path to the certificate that will be used for mTLS authentication.
- Key str cert_key_file:
Path to the file contains private key for mTLS certificate.
- Key str cert_key_password:
String or function which returns password for decrypting the mTLS private key.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key int connection_pool_maxsize:
Number of connections to save that can be reused by urllib3. Defaults to “multiprocessing.cpu_count() * 5”.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key bool auth_basic:
Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
- Key str username:
username
to authenticate via username and password credentials to the InfluxDB 2.x- Key str password:
password
to authenticate via username and password credentials to the InfluxDB 2.x- Key list[str] profilers:
list of enabled Flux profilers
- authorizations_api() AuthorizationsApi [source]
Create the Authorizations API instance.
- Returns:
authorizations api
- buckets_api() BucketsApi [source]
Create the Bucket API instance.
- Returns:
buckets api
- build() str [source]
Return the build type of the connected InfluxDB Server.
- Returns:
The type of InfluxDB build.
- classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]
Configure client via configuration file. The configuration has to be under ‘influx’ section.
- Parameters:
config_file – Path to configuration file
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key config_name:
Name of the configuration section of the configuration file
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
url
org
token
timeout,
verify_ssl
ssl_ca_cert
cert_file
cert_key_file
cert_key_password
connection_pool_maxsize
auth_basic
profilers
proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
- classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]
Configure client via environment properties.
- Parameters:
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
INFLUXDB_V2_URL
INFLUXDB_V2_ORG
INFLUXDB_V2_TOKEN
INFLUXDB_V2_TIMEOUT
INFLUXDB_V2_VERIFY_SSL
INFLUXDB_V2_SSL_CA_CERT
INFLUXDB_V2_CERT_FILE
INFLUXDB_V2_CERT_KEY_FILE
INFLUXDB_V2_CERT_KEY_PASSWORD
INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
INFLUXDB_V2_AUTH_BASIC
INFLUXDB_V2_PROFILERS
INFLUXDB_V2_TAG
- invokable_scripts_api() InvokableScriptsApi [source]
Create an InvokableScripts API instance.
- Returns:
InvokableScripts API instance
- organizations_api() OrganizationsApi [source]
Create the Organizations API instance.
- Returns:
organizations api
- query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApi [source]
Create an Query API instance.
- Parameters:
query_options – optional query api configuration
- Returns:
Query api instance
- version() str [source]
Return the version of the connected InfluxDB Server.
- Returns:
The version of InfluxDB.
- write_api(write_options=<influxdb_client.client.write_api.WriteOptions object>, point_settings=<influxdb_client.client.write_api.PointSettings object>, **kwargs) WriteApi [source]
Create Write API instance.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
If you would like to use a background batching, you have to configure client like this:
from influxdb_client import InfluxDBClient # Initialize background batching instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: with client.write_api() as write_api: pass
There is also possibility to use callbacks to notify about state of background batches:
from influxdb_client import InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: callback = BatchingCallback() with client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as write_api: pass
- Parameters:
write_options – Write API configuration
point_settings – settings to store default tags
- Key success_callback:
The callable
callback
to run after successfully writen a batch.- The callable must accept two arguments:
Tuple:
(bucket, organization, precision)
str: written data
[batching mode]
- Key error_callback:
The callable
callback
to run after unsuccessfully writen a batch.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an occurred error
[batching mode]
- Key retry_callback:
The callable
callback
to run after retryable error occurred.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an retryable error
[batching mode]
- Returns:
write api instance
QueryApi
- class influxdb_client.QueryApi(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]
Implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
- Parameters:
influxdb_client – influxdb client
- query(query: str, org=None, params: Optional[dict] = None) TableList [source]
Execute synchronous Flux query and return result as a
FluxTable
list.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- query_csv(query: str, org=None, dialect: Dialect = {'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None) CSVIterator [source]
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.dialect – csv dialect format
params – bind parameters
- Returns:
Iterator[List[str]]
wrapped intoCSVIterator
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = csv_iterator.to_values() print(output)
[ ['#datatype', 'string', 'long', 'dateTime:RFC3339', 'dateTime:RFC3339', 'dateTime:RFC3339', 'double', 'string', 'string', 'string'] ['#group', 'false', 'false', 'true', 'true', 'false', 'false', 'true', 'true', 'true'] ['#default', '_result', '', '', '', '', '', '', '', ''] ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
If you would like to turn off Annotated CSV header’s you can use following code:
from influxdb_client import InfluxDBClient, Dialect with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)', dialect=Dialect(header=False, annotations=[])) for csv_line in csv_iterator: print(csv_line)
[ ['', '_result', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '_result', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
- query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return Pandas DataFrame.
Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return stream of Pandas DataFrame as a
Generator[DataFrame]
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return result as raw unprocessed result as a str.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.dialect – csv dialect format
params – bind parameters
- Returns:
str
- query_stream(query: str, org=None, params: Optional[dict] = None) Generator[FluxRecord, Any, None] [source]
Execute synchronous Flux query and return stream of FluxRecord as a Generator[‘FluxRecord’].
- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.params – bind parameters
- Returns:
Generator[‘FluxRecord’]
- class influxdb_client.client.flux_table.FluxTable[source]
A table is set of records with a common set of columns and a group key.
The table can be serialized into JSON by:
import json from influxdb_client.client.flux_table import FluxStructureEncoder output = json.dumps(tables, cls=FluxStructureEncoder, indent=2) print(output)
Initialize defaults.
- class influxdb_client.client.flux_table.FluxRecord(table, values=None)[source]
A record is a tuple of named values and is represented using an object type.
Initialize defaults.
- class influxdb_client.client.flux_table.TableList(iterable=(), /)[source]
FluxTable
list with additionally functional to better handle of query result.- to_json(columns: Optional[List[str]] = None, **kwargs) str [source]
Serialize query results to a JSON formatted
str
.- Parameters:
columns – if not
None
then only specified columns are presented in results- Returns:
The query results is flattened to array:
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
The JSON format could be configured via
**kwargs
arguments:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
For all available options see - json.dump.
- to_values(columns: Optional[List[str]] = None) List[List[object]] [source]
Serialize query results to a flattened list of values.
- Parameters:
columns – if not
None
then only specified columns are presented in results- Returns:
list
of values
Output example:
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Configure required columns:
from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
- class influxdb_client.client.flux_table.CSVIterator(response: HTTPResponse)[source]
Iterator[List[str]]
with additionally functional to better handle of query result.Initialize
csv.reader
.
WriteApi
- class influxdb_client.WriteApi(influxdb_client, write_options: ~influxdb_client.client.write_api.WriteOptions = <influxdb_client.client.write_api.WriteOptions object>, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>, **kwargs)[source]
Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
Initialize defaults.
- Parameters:
influxdb_client – with default settings (organization)
write_options – write api configuration
point_settings – settings to store default tags.
- Key success_callback:
The callable
callback
to run after successfully writen a batch.- The callable must accept two arguments:
Tuple:
(bucket, organization, precision)
str: written data
[batching mode]
- Key error_callback:
The callable
callback
to run after unsuccessfully writen a batch.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an occurred error
[batching mode]
- Key retry_callback:
The callable
callback
to run after retryable error occurred.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an retryable error
[batching mode]
- write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], Observable, NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) Any [source]
Write time-series data into InfluxDB.
- Parameters:
bucket (str) – specifies the destination bucket for writes (required)
org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from
InfluxDBClient.org
is used.write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or RxPY Observable to write
- Key data_frame_measurement_name:
name of measurement for writing Pandas DataFrame -
DataFrame
- Key data_frame_tag_columns:
list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
- Key data_frame_timestamp_column:
name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
- Key data_frame_timestamp_timezone:
name of the timezone which is used for timestamp column -
DataFrame
- Key record_measurement_key:
key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
- Key record_measurement_name:
static measurement name -
dictionary
,NamedTuple
,dataclass
- Key record_time_key:
key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
- Key record_tag_keys:
list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
- Key record_field_keys:
list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
- Example:
# Record as Line Protocol write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
- class influxdb_client.client.write.point.Point(measurement_name)[source]
Point defines the values that will be written to the database.
Ref: https://docs.influxdata.com/influxdb/latest/reference/key-concepts/data-elements/#point
Initialize defaults.
- static from_dict(dictionary: dict, write_precision: WritePrecision = 'ns', **kwargs)[source]
Initialize point from ‘dict’ structure.
- The expected dict structure is:
measurement
tags
fields
time
- Example:
# Use default dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1.0}, "time": 1 } point = Point.from_dict(dict_structure, WritePrecision.NS)
- Example:
# Use custom dictionary structure dictionary = { "name": "sensor_pt859", "location": "warehouse_125", "version": "2021.06.05.5874", "pressure": 125, "temperature": 10, "created": 1632208639, } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, record_measurement_key="name", record_time_key="created", record_tag_keys=["location", "version"], record_field_keys=["pressure", "temperature"])
- Int Types:
The following example shows how to configure the types of integers fields. It is useful when you want to serialize integers always as
float
to avoidfield type conflict
or useunsigned 64-bit integer
as the type for serialization.# Use custom dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": { "water_level": 1.0, "some_counter": 108913123234 }, "time": 1 } point = Point.from_dict(dict_structure, field_types={"some_counter": "uint"})
- Parameters:
dictionary – dictionary for serialize into data Point
write_precision – sets the precision for the supplied time values
- Key record_measurement_key:
key of dictionary with specified measurement
- Key record_measurement_name:
static measurement name for data Point
- Key record_time_key:
key of dictionary with specified timestamp
- Key record_tag_keys:
list of dictionary keys to use as a tag
- Key record_field_keys:
list of dictionary keys to use as a field
- Key field_types:
optional dictionary to specify types of serialized fields. Currently, is supported customization for integer types. Possible integers types:
int
- serialize integers as “Signed 64-bit integers” -9223372036854775807i
(default behaviour)uint
- serialize integers as “Unsigned 64-bit integers” -9223372036854775807u
float
- serialize integers as “IEEE-754 64-bit floating-point numbers”. Useful for unify number types in your pipeline to avoid field type conflict -9223372036854775807
The
field_types
can be also specified as part of incoming dictionary. For more info see an example above.- Returns:
new data point
- time(time, write_precision='ns')[source]
Specify timestamp for DataPoint with declared precision.
If time doesn’t have specified timezone we assume that timezone is UTC.
- Examples::
Point.measurement(“h2o”).field(“val”, 1).time(“2009-11-10T23:00:00.123456Z”) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000) Point.measurement(“h2o”).field(“val”, 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000, write_precision=WritePrecision.NS)
- Parameters:
time – the timestamp for your data
write_precision – sets the precision for the supplied time values
- Returns:
this point
- to_line_protocol(precision=None)[source]
Create LineProtocol.
- param precision:
required precision of LineProtocol. If it’s not set then use the precision from
Point
.
- property write_precision
Get precision.
- class influxdb_client.domain.write_precision.WritePrecision[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
WritePrecision - a model defined in OpenAPI.
- NS = 'ns'
- Attributes:
- openapi_types (dict): The key is attribute name
and the value is attribute type.
- attribute_map (dict): The key is attribute name
and the value is json key in definition.
BucketsApi
- class influxdb_client.BucketsApi(influxdb_client)[source]
Implementation for ‘/api/v2/buckets’ endpoint.
Initialize defaults.
- create_bucket(bucket=None, bucket_name=None, org_id=None, retention_rules=None, description=None, org=None) Bucket [source]
Create a bucket.
- Parameters:
bucket (Bucket|PostBucketRequest) – bucket to create
bucket_name – bucket name
description – bucket description
org_id – org_id
bucket_name – bucket name
retention_rules – retention rules array or single BucketRetentionRules
org (str, Organization) – specifies the organization for create the bucket; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.
- Returns:
Bucket If the method is called asynchronously, returns the request thread.
- delete_bucket(bucket)[source]
Delete a bucket.
- Parameters:
bucket – bucket id or Bucket
- Returns:
Bucket
- find_bucket_by_name(bucket_name)[source]
Find bucket by name.
- Parameters:
bucket_name – bucket name
- Returns:
Bucket
- find_buckets(**kwargs)[source]
List buckets.
- Key int offset:
Offset for pagination
- Key int limit:
Limit for pagination
- Key str after:
The last resource ID from which to seek from (but not including). This is to be used instead of offset.
- Key str org:
The organization name.
- Key str org_id:
The organization ID.
- Key str name:
Only returns buckets with a specific name.
- Returns:
Buckets
- class influxdb_client.domain.Bucket(links=None, id=None, type='user', name=None, description=None, org_id=None, rp=None, schema_type=None, created_at=None, updated_at=None, retention_rules=None, labels=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Bucket - a model defined in OpenAPI.
- property created_at
Get the created_at of this Bucket.
- Returns:
The created_at of this Bucket.
- Return type:
datetime
- property description
Get the description of this Bucket.
- Returns:
The description of this Bucket.
- Return type:
- property labels
Get the labels of this Bucket.
- Returns:
The labels of this Bucket.
- Return type:
list[Label]
- property links
Get the links of this Bucket.
- Returns:
The links of this Bucket.
- Return type:
BucketLinks
- property org_id
Get the org_id of this Bucket.
- Returns:
The org_id of this Bucket.
- Return type:
- property retention_rules
Get the retention_rules of this Bucket.
Retention rules to expire or retain data. The InfluxDB /api/v2 API uses RetentionRules to configure the [retention period](https://docs.influxdata.com/influxdb/latest/reference/glossary/#retention-period). #### InfluxDB Cloud - retentionRules is required. #### InfluxDB OSS - retentionRules isn’t required.
- Returns:
The retention_rules of this Bucket.
- Return type:
list[BucketRetentionRules]
- property schema_type
Get the schema_type of this Bucket.
- Returns:
The schema_type of this Bucket.
- Return type:
SchemaType
- property updated_at
Get the updated_at of this Bucket.
- Returns:
The updated_at of this Bucket.
- Return type:
datetime
LabelsApi
- class influxdb_client.LabelsApi(influxdb_client)[source]
Implementation for ‘/api/v2/labels’ endpoint.
Initialize defaults.
- clone_label(cloned_name: str, label: Label) Label [source]
Create the new instance of the label as a copy existing label.
- Parameters:
cloned_name – new label name
label – existing label
- Returns:
clonned Label
- create_label(name: str, org_id: str, properties: Optional[Dict[str, str]] = None) Label [source]
Create a new label.
- Parameters:
name – label name
org_id – organization id
properties – optional label properties
- Returns:
created label
- delete_label(label: Union[str, Label])[source]
Delete the label.
- Parameters:
label – label id or Label
- find_label_by_id(label_id: str)[source]
Retrieve the label by id.
- Parameters:
label_id –
- Returns:
Label
- find_label_by_org(org_id) List[Label] [source]
Get the list of all labels for given organization.
- Parameters:
org_id – organization id
- Returns:
list of labels
OrganizationsApi
- class influxdb_client.OrganizationsApi(influxdb_client)[source]
Implementation for ‘/api/v2/orgs’ endpoint.
Initialize defaults.
- create_organization(name: Optional[str] = None, organization: Optional[Organization] = None) Organization [source]
Create an organization.
- find_organizations(**kwargs)[source]
List all organizations.
- Key int offset:
Offset for pagination
- Key int limit:
Limit for pagination
- Key bool descending:
- Key str org:
Filter organizations to a specific organization name.
- Key str org_id:
Filter organizations to a specific organization ID.
- Key str user_id:
Filter organizations to a specific user ID.
- update_organization(organization: Organization) Organization [source]
Update an organization.
- Parameters:
organization – Organization update to apply (required)
- Returns:
Organization
- class influxdb_client.domain.Organization(links=None, id=None, name=None, default_storage_type=None, description=None, created_at=None, updated_at=None, status='active')[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Organization - a model defined in OpenAPI.
- property created_at
Get the created_at of this Organization.
- Returns:
The created_at of this Organization.
- Return type:
datetime
- property default_storage_type
Get the default_storage_type of this Organization.
Discloses whether the organization uses TSM or IOx.
- Returns:
The default_storage_type of this Organization.
- Return type:
- property description
Get the description of this Organization.
- Returns:
The description of this Organization.
- Return type:
- property id
Get the id of this Organization.
- Returns:
The id of this Organization.
- Return type:
- property links
Get the links of this Organization.
- Returns:
The links of this Organization.
- Return type:
OrganizationLinks
- property name
Get the name of this Organization.
- Returns:
The name of this Organization.
- Return type:
- property status
Get the status of this Organization.
If inactive, the organization is inactive.
- Returns:
The status of this Organization.
- Return type:
- property updated_at
Get the updated_at of this Organization.
- Returns:
The updated_at of this Organization.
- Return type:
datetime
UsersApi
- class influxdb_client.UsersApi(influxdb_client)[source]
Implementation for ‘/api/v2/users’ endpoint.
Initialize defaults.
- delete_user(user: Union[str, User, UserResponse]) None [source]
Delete a user.
- Parameters:
user – user id or User
- Returns:
None
- find_users(**kwargs) Users [source]
List all users.
- Key int offset:
The offset for pagination. The number of records to skip.
- Key int limit:
Limits the number of records returned. Default is 20.
- Key str after:
The last resource ID from which to seek from (but not including). This is to be used instead of offset.
- Key str name:
The user name.
- Key str id:
The user ID.
- Returns:
Buckets
- class influxdb_client.domain.User(id=None, name=None, status='active')[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
User - a model defined in OpenAPI.
- property name
Get the name of this User.
The user name.
- Returns:
The name of this User.
- Return type:
- property status
Get the status of this User.
If inactive, the user is inactive. Default is active.
- Returns:
The status of this User.
- Return type:
TasksApi
- class influxdb_client.TasksApi(influxdb_client)[source]
Implementation for ‘/api/v2/tasks’ endpoint.
Initialize defaults.
- cancel_run(task_id: str, run_id: str)[source]
Cancel a currently running run.
- Parameters:
task_id –
run_id –
- create_task(task: Optional[Task] = None, task_create_request: Optional[TaskCreateRequest] = None) Task [source]
Create a new task.
- create_task_cron(name: str, flux: str, cron: str, org_id: str) Task [source]
Create a new task with cron repetition schedule.
- create_task_every(name, flux, every, organization) Task [source]
Create a new task with every repetition schedule.
- find_tasks(**kwargs)[source]
List all tasks up to set limit (max 500).
- Key str name:
only returns tasks with the specified name
- Key str after:
returns tasks after specified ID
- Key str user:
filter tasks to a specific user ID
- Key str org:
filter tasks to a specific organization name
- Key str org_id:
filter tasks to a specific organization ID
- Key int limit:
the number of tasks to return
- Returns:
Tasks
- find_tasks_iter(**kwargs)[source]
Iterate over all tasks with pagination.
- Key str name:
only returns tasks with the specified name
- Key str after:
returns tasks after specified ID
- Key str user:
filter tasks to a specific user ID
- Key str org:
filter tasks to a specific organization name
- Key str org_id:
filter tasks to a specific organization ID
- Key int limit:
the number of tasks in one page
- Returns:
Tasks iterator
- get_logs(task_id: str) List[LogEvent] [source]
Retrieve all logs for a task.
- Parameters:
task_id – task id
- get_run(task_id: str, run_id: str) Run [source]
Get run record for specific task and run id.
- Parameters:
task_id – task id
run_id – run id
- Returns:
Run for specified task and run id
- get_runs(task_id, **kwargs) List[Run] [source]
Retrieve list of run records for a task.
- Parameters:
task_id – task id
- Key str after:
returns runs after specified ID
- Key int limit:
the number of runs to return
- Key datetime after_time:
filter runs to those scheduled after this time, RFC3339
- Key datetime before_time:
filter runs to those scheduled before this time, RFC3339
- retry_run(task_id: str, run_id: str)[source]
Retry a task run.
- Parameters:
task_id – task id
run_id – run id
- class influxdb_client.domain.Task(id=None, org_id=None, org=None, name=None, owner_id=None, description=None, status=None, labels=None, authorization_id=None, flux=None, every=None, cron=None, offset=None, latest_completed=None, last_run_status=None, last_run_error=None, created_at=None, updated_at=None, links=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Task - a model defined in OpenAPI.
- property authorization_id
Get the authorization_id of this Task.
An authorization ID. Specifies the authorization used when the task communicates with the query engine. To find an authorization ID, use the [GET /api/v2/authorizations endpoint](#operation/GetAuthorizations) to list authorizations.
- Returns:
The authorization_id of this Task.
- Return type:
- property created_at
Get the created_at of this Task.
- Returns:
The created_at of this Task.
- Return type:
datetime
- property cron
Get the cron of this Task.
A [Cron expression](https://en.wikipedia.org/wiki/Cron#Overview) that defines the schedule on which the task runs. InfluxDB uses the system time when evaluating Cron expressions.
- Returns:
The cron of this Task.
- Return type:
- property description
Get the description of this Task.
A description of the task.
- Returns:
The description of this Task.
- Return type:
- property every
Get the every of this Task.
The interval ([duration literal](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)) at which the task runs. every also determines when the task first runs, depending on the specified time.
- Returns:
The every of this Task.
- Return type:
- property flux
Get the flux of this Task.
The Flux script that the task executes.
- Returns:
The flux of this Task.
- Return type:
- property labels
Get the labels of this Task.
- Returns:
The labels of this Task.
- Return type:
list[Label]
- property last_run_error
Get the last_run_error of this Task.
- Returns:
The last_run_error of this Task.
- Return type:
- property last_run_status
Get the last_run_status of this Task.
- Returns:
The last_run_status of this Task.
- Return type:
- property latest_completed
Get the latest_completed of this Task.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)) of the latest scheduled and completed run.
- Returns:
The latest_completed of this Task.
- Return type:
datetime
- property links
Get the links of this Task.
- Returns:
The links of this Task.
- Return type:
TaskLinks
- property name
Get the name of this Task.
The name of the task.
- Returns:
The name of this Task.
- Return type:
- property offset
Get the offset of this Task.
A [duration](https://docs.influxdata.com/flux/v0.x/spec/lexical-elements/#duration-literals) to delay execution of the task after the scheduled time has elapsed. 0 removes the offset.
- Returns:
The offset of this Task.
- Return type:
- property org
Get the org of this Task.
An [organization](https://docs.influxdata.com/influxdb/latest/reference/glossary/#organization) name. Specifies the organization that owns the task.
- Returns:
The org of this Task.
- Return type:
- property org_id
Get the org_id of this Task.
An [organization](https://docs.influxdata.com/influxdb/latest/reference/glossary/#organization) ID. Specifies the organization that owns the task.
- Returns:
The org_id of this Task.
- Return type:
- property owner_id
Get the owner_id of this Task.
A [user](https://docs.influxdata.com/influxdb/latest/reference/glossary/#user) ID. Specifies the owner of the task. To find a user ID, you can use the [GET /api/v2/users endpoint](#operation/GetUsers) to list users.
- Returns:
The owner_id of this Task.
- Return type:
- property status
Get the status of this Task.
- Returns:
The status of this Task.
- Return type:
TaskStatusType
- property updated_at
Get the updated_at of this Task.
- Returns:
The updated_at of this Task.
- Return type:
datetime
InvokableScriptsApi
- class influxdb_client.InvokableScriptsApi(influxdb_client)[source]
Use API invokable scripts to create custom InfluxDB API endpoints that query, process, and shape data.
Initialize defaults.
- create_script(create_request: ScriptCreateRequest) Script [source]
Create a script.
- Parameters:
create_request (ScriptCreateRequest) – The script to create. (required)
- Returns:
The created script.
- delete_script(script_id: str) None [source]
Delete a script.
- Parameters:
script_id (str) – The ID of the script to delete. (required)
- Returns:
None
- invoke_script(script_id: str, params: Optional[dict] = None) TableList [source]
Invoke synchronously a script and return result as a TableList.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- invoke_script_csv(script_id: str, params: Optional[dict] = None) CSVIterator [source]
Invoke synchronously a script and return result as a CSV iterator. Each iteration returns a row of the CSV file.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
Iterator[List[str]]
wrapped intoCSVIterator
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.invokable_scripts_api().invoke_script_csv(script_id="script-id") # Serialize to values output = csv_iterator.to_values() print(output)
[ ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
- invoke_script_data_frame(script_id: str, params: Optional[dict] = None, data_frame_index: Optional[List[str]] = None)[source]
Invoke synchronously a script and return Pandas DataFrame.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
script
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
- Returns:
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- invoke_script_data_frame_stream(script_id: str, params: Optional[dict] = None, data_frame_index: Optional[List[str]] = None)[source]
Invoke synchronously a script and return stream of Pandas DataFrame as a Generator[‘pd.DataFrame’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
script
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
- Returns:
Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- invoke_script_raw(script_id: str, params: Optional[dict] = None) Iterator[List[str]] [source]
Invoke synchronously a script and return result as raw unprocessed result as a str.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
Result as a str.
- invoke_script_stream(script_id: str, params: Optional[dict] = None) Generator[FluxRecord, Any, None] [source]
Invoke synchronously a script and return result as a Generator[‘FluxRecord’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
Stream of FluxRecord.
- Return type:
Generator[‘FluxRecord’]
- class influxdb_client.domain.Script(id=None, name=None, description=None, org_id=None, script=None, language=None, url=None, created_at=None, updated_at=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Script - a model defined in OpenAPI.
- property created_at
Get the created_at of this Script.
- Returns:
The created_at of this Script.
- Return type:
datetime
- property description
Get the description of this Script.
- Returns:
The description of this Script.
- Return type:
- property language
Get the language of this Script.
- Returns:
The language of this Script.
- Return type:
ScriptLanguage
- property org_id
Get the org_id of this Script.
- Returns:
The org_id of this Script.
- Return type:
- property script
Get the script of this Script.
The script to execute.
- Returns:
The script of this Script.
- Return type:
- property updated_at
Get the updated_at of this Script.
- Returns:
The updated_at of this Script.
- Return type:
datetime
- class influxdb_client.domain.ScriptCreateRequest(name=None, description=None, script=None, language=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
ScriptCreateRequest - a model defined in OpenAPI.
- property description
Get the description of this ScriptCreateRequest.
Script description. A description of the script.
- Returns:
The description of this ScriptCreateRequest.
- Return type:
- property language
Get the language of this ScriptCreateRequest.
- Returns:
The language of this ScriptCreateRequest.
- Return type:
ScriptLanguage
- property name
Get the name of this ScriptCreateRequest.
Script name. The name must be unique within the organization.
- Returns:
The name of this ScriptCreateRequest.
- Return type:
- property script
Get the script of this ScriptCreateRequest.
The script to execute.
- Returns:
The script of this ScriptCreateRequest.
- Return type:
DeleteApi
- class influxdb_client.DeleteApi(influxdb_client)[source]
Implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
- delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) None [source]
Delete Time series data from InfluxDB.
- Parameters:
start (str, datetime.datetime) – start time
stop (str, datetime.datetime) – stop time
predicate (str) – predicate
bucket (str) – bucket id or name from which data will be deleted
org (str, Organization) – specifies the organization to delete data from. Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.
- Returns:
- class influxdb_client.domain.DeletePredicateRequest(start=None, stop=None, predicate=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
DeletePredicateRequest - a model defined in OpenAPI.
- property predicate
Get the predicate of this DeletePredicateRequest.
An expression in [delete predicate syntax](https://docs.influxdata.com/influxdb/latest/reference/syntax/delete-predicate/).
- Returns:
The predicate of this DeletePredicateRequest.
- Return type:
- property start
Get the start of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)). The earliest time to delete from.
- Returns:
The start of this DeletePredicateRequest.
- Return type:
datetime
- property stop
Get the stop of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)). The latest time to delete from.
- Returns:
The stop of this DeletePredicateRequest.
- Return type:
datetime
Helpers
- class influxdb_client.client.util.date_utils.DateHelper(timezone: tzinfo = datetime.timezone.utc)[source]
DateHelper to groups different implementations of date operations.
If you would like to serialize the query results to custom timezone, you can use following code:
from influxdb_client.client.util import date_utils from influxdb_client.client.util.date_utils import DateHelper import dateutil.parser from dateutil import tz def parse_date(date_string: str): return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2')) date_utils.date_helper = DateHelper() date_utils.date_helper.parse_date = parse_date
Initialize defaults.
- Parameters:
timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”.
- parse_date(date_string: str)[source]
Parse string into Date or Timestamp.
- Returns:
Returns a
datetime.datetime
object or compliant implementation likeclass 'pandas._libs.tslibs.timestamps.Timestamp
- to_nanoseconds(delta)[source]
Get number of nanoseconds in timedelta.
Solution comes from v1 client. Thx. https://github.com/influxdata/influxdb-python/pull/811
- class influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper(timezone: tzinfo = datetime.timezone.utc)[source]
DateHelper that use Pandas library with nanosecond precision.
Initialize defaults.
- Parameters:
timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”.
- class influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter(**kwargs)[source]
The Helper class to write data into InfluxDB in independent OS process.
- Example:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) writer.start() for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") writer.__del__() if __name__ == '__main__': main()
- How to use with context_manager:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
- How to handle batch events:
from influxdb_client import WriteOptions from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") def main(): callback = BatchingCallback() with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
Initialize defaults.
For more information how to initialize the writer see the examples above.
- Parameters:
kwargs – arguments are passed into
__init__
function ofInfluxDBClient
andwrite_api
.
Async API Reference
InfluxDBClientAsync
- class influxdb_client.client.influxdb_client_async.InfluxDBClientAsync(url, token: Optional[str] = None, org: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, **kwargs)[source]
InfluxDBClientAsync is client for InfluxDB v2.
Initialize defaults.
- Parameters:
url – InfluxDB server API url (ex. http://localhost:8086).
token –
token
to authenticate to the InfluxDB 2.xorg – organization name (used as a default in Query, Write and Delete API)
debug – enable verbose logging of http requests
timeout – The maximal number of milliseconds for the whole HTTP request including connection establishment, request sending and response reading. It can also be a
ClientTimeout
which is directly pass toaiohttp
.enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key bool verify_ssl:
Set this to false to skip verifying SSL certificate when calling API from https server.
- Key str ssl_ca_cert:
Set this to customize the certificate file to verify the peer.
- Key str cert_file:
Path to the certificate that will be used for mTLS authentication.
- Key str cert_key_file:
Path to the file contains private key for mTLS certificate.
- Key str cert_key_password:
String or function which returns password for decrypting the mTLS private key.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key int connection_pool_maxsize:
The total number of simultaneous connections. Defaults to “multiprocessing.cpu_count() * 5”.
- Key bool auth_basic:
Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
- Key str username:
username
to authenticate via username and password credentials to the InfluxDB 2.x- Key str password:
password
to authenticate via username and password credentials to the InfluxDB 2.x- Key bool allow_redirects:
If set to
False
, do not follow HTTP redirects.True
by default.- Key int max_redirects:
Maximum number of HTTP redirects to follow.
10
by default.- Key dict client_session_kwargs:
Additional configuration arguments for
ClientSession
- Key type client_session_type:
Type of aiohttp client to use. Useful for third party wrappers like
aiohttp-retry
.ClientSession
by default.- Key list[str] profilers:
list of enabled Flux profilers
- async build() str [source]
Return the build type of the connected InfluxDB Server.
- Returns:
The type of InfluxDB build.
- delete_api() DeleteApiAsync [source]
Get the asynchronous delete metrics API instance.
- Returns:
delete api
- classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]
Configure client via configuration file. The configuration has to be under ‘influx’ section.
- Parameters:
config_file – Path to configuration file
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key config_name:
Name of the configuration section of the configuration file
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
url
org
token
timeout,
verify_ssl
ssl_ca_cert
cert_file
cert_key_file
cert_key_password
connection_pool_maxsize
auth_basic
profilers
proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
- classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]
Configure client via environment properties.
- Parameters:
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
INFLUXDB_V2_URL
INFLUXDB_V2_ORG
INFLUXDB_V2_TOKEN
INFLUXDB_V2_TIMEOUT
INFLUXDB_V2_VERIFY_SSL
INFLUXDB_V2_SSL_CA_CERT
INFLUXDB_V2_CERT_FILE
INFLUXDB_V2_CERT_KEY_FILE
INFLUXDB_V2_CERT_KEY_PASSWORD
INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
INFLUXDB_V2_AUTH_BASIC
INFLUXDB_V2_PROFILERS
INFLUXDB_V2_TAG
- async ping() bool [source]
Return the status of InfluxDB instance.
- Returns:
The status of InfluxDB.
- query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApiAsync [source]
Create an asynchronous Query API instance.
- Parameters:
query_options – optional query api configuration
- Returns:
Query api instance
- async version() str [source]
Return the version of the connected InfluxDB Server.
- Returns:
The version of InfluxDB.
- write_api(point_settings=<influxdb_client.client.write_api.PointSettings object>) WriteApiAsync [source]
Create an asynchronous Write API instance.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
- Parameters:
point_settings – settings to store default tags
- Returns:
write api instance
QueryApiAsync
- class influxdb_client.client.query_api_async.QueryApiAsync(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]
Asynchronous implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
- Parameters:
influxdb_client – influxdb client
- async query(query: str, org=None, params: Optional[dict] = None) TableList [source]
Execute asynchronous Flux query and return result as a
FluxTable
list.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = await client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- async query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return
DataFrame
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- async query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return stream of
DataFrame
as an AsyncGenerator[DataFrame
].Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
AsyncGenerator[:class:`DataFrame
]`
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- async query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]
Execute asynchronous Flux query and return result as raw unprocessed result as a str.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.dialect – csv dialect format
params – bind parameters
- Returns:
- async query_stream(query: str, org=None, params: Optional[dict] = None) AsyncGenerator[FluxRecord, None] [source]
Execute asynchronous Flux query and return stream of
FluxRecord
as an AsyncGenerator[FluxRecord
].- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.params – bind parameters
- Returns:
AsyncGenerator[
FluxRecord
]
WriteApiAsync
- class influxdb_client.client.write_api_async.WriteApiAsync(influxdb_client, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>)[source]
Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client_async import InfluxDBClientAsync # Initialize async/await instance of Write API async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api()
Initialize defaults.
- Parameters:
influxdb_client – with default settings (organization)
point_settings – settings to store default tags.
- async write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) bool [source]
Write time-series data into InfluxDB.
- Parameters:
bucket (str) – specifies the destination bucket for writes (required)
org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from
InfluxDBClientAsync.org
is used.write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame
- Key data_frame_measurement_name:
name of measurement for writing Pandas DataFrame -
DataFrame
- Key data_frame_tag_columns:
list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
- Key data_frame_timestamp_column:
name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
- Key data_frame_timestamp_timezone:
name of the timezone which is used for timestamp column -
DataFrame
- Key record_measurement_key:
key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
- Key record_measurement_name:
static measurement name -
dictionary
,NamedTuple
,dataclass
- Key record_time_key:
key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
- Key record_tag_keys:
list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
- Key record_field_keys:
list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
- Returns:
True
for successfully accepted data, otherwise raise an exception
- Example:
# Record as Line Protocol await write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } await write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) await write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
DeleteApiAsync
- class influxdb_client.client.delete_api_async.DeleteApiAsync(influxdb_client)[source]
Async implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
- async delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) bool [source]
Delete Time series data from InfluxDB.
- Parameters:
start (str, datetime.datetime) – start time
stop (str, datetime.datetime) – stop time
predicate (str) – predicate
bucket (str) – bucket id or name from which data will be deleted
org (str, Organization) – specifies the organization to delete data from. Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClientAsync.org
is used.
- Returns:
True
for successfully deleted data, otherwise raise an exception
Migration Guide
This guide is meant to help you migrate your Python code from
influxdb-python to
influxdb-client-python
by providing code examples that cover common
usages.
If there is something missing, please feel free to create a new request for a guide enhancement.
Before You Start
Please take a moment to review the following client docs:
Content
Initializing Client
influxdb-python
from influxdb import InfluxDBClient
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
influxdb-client-python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org') as client:
pass
Creating Database/Bucket
influxdb-python
from influxdb import InfluxDBClient
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
dbname = 'example'
client.create_database(dbname)
client.create_retention_policy('awesome_policy', '60m', 3, database=dbname, default=True)
influxdb-client-python
from influxdb_client import InfluxDBClient, BucketRetentionRules
org = 'my-org'
with InfluxDBClient(url='http://localhost:8086', token='my-token', org=org) as client:
buckets_api = client.buckets_api()
# Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
retention_rules=retention_rules,
org=org)
Dropping Database/Bucket
influxdb-python
from influxdb import InfluxDBClient
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
dbname = 'example'
client.drop_database(dbname)
influxdb-client-python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org') as client:
buckets_api = client.buckets_api()
bucket = buckets_api.find_bucket_by_name("my-bucket")
buckets_api.delete_bucket(bucket)
Writing LineProtocol
influxdb-python
from influxdb import InfluxDBClient
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
client.write('h2o_feet,location=coyote_creek water_level=1.0 1', protocol='line')
influxdb-client-python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org') as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket='my-bucket', record='h2o_feet,location=coyote_creek water_level=1.0 1')
Writing Dictionary-style object
influxdb-python
from influxdb import InfluxDBClient
record = [
{
"measurement": "cpu_load_short",
"tags": {
"host": "server01",
"region": "us-west"
},
"time": "2009-11-10T23:00:00Z",
"fields": {
"Float_value": 0.64,
"Int_value": 3,
"String_value": "Text",
"Bool_value": True
}
}
]
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
client.write_points(record)
influxdb-client-python
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org') as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
record = [
{
"measurement": "cpu_load_short",
"tags": {
"host": "server01",
"region": "us-west"
},
"time": "2009-11-10T23:00:00Z",
"fields": {
"Float_value": 0.64,
"Int_value": 3,
"String_value": "Text",
"Bool_value": True
}
}
]
write_api.write(bucket='my-bucket', record=record)
Writing Structured Data
influxdb-python
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
my_client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
class MySeriesHelper(SeriesHelper):
class Meta:
client = my_client
series_name = 'events.stats.{server_name}'
fields = ['some_stat', 'other_stat']
tags = ['server_name']
bulk_size = 5
autocommit = True
MySeriesHelper(server_name='us.east-1', some_stat=159, other_stat=10)
MySeriesHelper(server_name='us.east-1', some_stat=158, other_stat=20)
MySeriesHelper.commit()
The influxdb-client-python
doesn’t have an equivalent implementation for MySeriesHelper
, but there is an option
to use Python Data Classes way:
influxdb-client-python
from dataclasses import dataclass
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
@dataclass
class Car:
"""
DataClass structure - Car
"""
engine: str
type: str
speed: float
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org') as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
car = Car('12V-BT', 'sport-cars', 125.25)
write_api.write(bucket="my-bucket",
record=car,
record_measurement_name="performance",
record_tag_keys=["engine", "type"],
record_field_keys=["speed"])
Writing Pandas DataFrame
influxdb-python
import pandas as pd
from influxdb import InfluxDBClient
df = pd.DataFrame(data=list(range(30)),
index=pd.date_range(start='2014-11-16', periods=30, freq='H'),
columns=['0'])
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
client.write_points(df, 'demo', protocol='line')
influxdb-client-python
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org') as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
df = pd.DataFrame(data=list(range(30)),
index=pd.date_range(start='2014-11-16', periods=30, freq='H'),
columns=['0'])
write_api.write(bucket='my-bucket', record=df, data_frame_measurement_name='demo')
Querying
influxdb-python
from influxdb import InfluxDBClient
client = InfluxDBClient(host='127.0.0.1', port=8086, username='root', password='root', database='dbname')
points = client.query('SELECT * from cpu').get_points()
for point in points:
print(point)
influxdb-client-python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', debug=True) as client:
query = '''from(bucket: "my-bucket")
|> range(start: -10000d)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
tables = client.query_api().query(query)
for record in [record for table in tables for record in table.records]:
print(record.values)
If you would like to omit boilerplate columns such as _result
, _table
, _start
, … you can filter the record values by
following expression:
print({k: v for k, v in record.values.items() if k not in ['result', 'table', '_start', '_stop', '_measurement']})
For more info see Flux Response Format.
Development
The following document covers how to develop the InfluxDB client library locally. Including how to run tests and build the docs.
tl;dr
# from your forked repo, create and activate a virtualenv
python -m virtualenv venv
. venv/bin/activate
# install the library as editable with all dependencies
make install
# make edits
# run lint and tests
make lint test
Getting Started
Install Python
Most distributions include Python by default, so before going too far, try running
python --version
to see if it already exists. You may also have to specifypython3 --version
, for example, on Ubuntu.Fork and clone the repo
The rest of this assumes you have cloned your fork of the upstream client library and are in the same directory of the forked repo.
Set up a virtual environment.
Python virtual environments let you install specific versioned dependencies in a contained manner. This way, you do not pollute or have conflicts on your system with different versions.
python -m virtualenv venv . venv/bin/activate
Having a shell prompt change via starship or something similar is nice as it will let you know when and which virtual environment in you are in.
To exit the virtual environment, run
deactivate
.Install the client library
To install the local version of the client library run:
make install
This will install the library as editable with all dependencies. This includes all dependencies that are used for all possible features as well as testing requirements.
Make changes and test
At this point, a user can make the required changes necessary and run any tests or scripts they have.
Before putting up a PR, the user should attempt to run the lint and tests locally. Lint will ensure the formatting of the code, while tests will run integration tests against an InfluxDB instance. For details on that set up see the next section.
make lint test
Linting
The library uses flake8 to do linting and can be run with:
make lint
Testing
The built-in tests assume that there is a running instance of InfluxDB 2.x up
and running. This can be accomplished by running the
scripts/influxdb-restart.sh
script. It will launch an InfluxDB 2.x instance
with Docker and make it available locally on port 8086.
Once InfluxDB is available, run all the tests with:
make test
Code Coverage
After running the tests, an HTML report of the tests is available in the
htmlcov
directory. Users can open html/index.html
file in a browser
and see a full report for code coverage across the whole project. Clicking
on a specific file will show a line-by-line report of what lines were or
were not covered.
Documentation
The docs are built using Sphinx. To build all the docs run:
make docs
This will build and produce a sample version of the web docs at
docs/_build/html/index.html
. From there the user can view the entire site
and ensure changes are rendered correctly.
This repository contains the Python client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight v3 client library. InfluxDB 1.x users should use the v1 client library.
For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the v1 client library.
The API of the influxdb-client-python is not the backwards-compatible with the old one - influxdb-python.
Documentation
This section contains links to the client library documentation.
InfluxDB 2.0 client features
- Querying data
using the Flux language
into csv, raw data, flux_table structure, Pandas DataFrame
- Writing data using
RxPY Observable
- InfluxDB 2.0 API client for management
the client is generated from the swagger by using the openapi-generator
organizations & users management
buckets management
tasks management
authorizations
health check
…
Installation
InfluxDB python library uses RxPY - The Reactive Extensions for Python (RxPY).
Python 3.7 or later is required.
Note
It is recommended to use ciso8601
with client for parsing dates. ciso8601
is much faster than built-in Python datetime. Since it’s written as a C
module the best way is build it from sources:
Windows:
You have to install Visual C++ Build Tools 2015 to build ciso8601
by pip
.
conda:
Install from sources: conda install -c conda-forge/label/cf202003 ciso8601
.
pip install
The python package is hosted on PyPI, you can install latest version directly:
pip install 'influxdb-client[ciso]'
Then import the package:
import influxdb_client
If your application uses async/await in Python you can install with the async
extra:
$ pip install influxdb-client[async]
For more info see How to use Asyncio.
Setuptools
Install via Setuptools.
python setup.py install --user
(or sudo python setup.py install
to install the package for all users)
Getting Started
Please follow the Installation and then run the following:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "my-bucket"
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, record=p)
## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for row in table.records:
print (row.values)
## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
for cell in row:
val_count += 1
Client configuration
Via File
A client can be configured via *.ini
file in segment influx2
.
The following options are supported:
url
- the url to connect to InfluxDBorg
- default destination organization for writes and queriestoken
- the token to use for the authorizationtimeout
- socket timeout in ms (default value is 10000)verify_ssl
- set this to false to skip verifying SSL certificate when calling API from https serverssl_ca_cert
- set this to customize the certificate file to verify the peercert_file
- path to the certificate that will be used for mTLS authenticationcert_key_file
- path to the file contains private key for mTLS certificatecert_key_password
- string or function which returns password for decrypting the mTLS private keyconnection_pool_maxsize
- set the number of connections to save that can be reused by urllib3auth_basic
- enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)profilers
- set the list of enabled Flux profilers
self.client = InfluxDBClient.from_config_file("config.ini")
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
verify_ssl=False
Via Environment Properties
A client can be configured via environment properties.
Supported properties are:
INFLUXDB_V2_URL
- the url to connect to InfluxDBINFLUXDB_V2_ORG
- default destination organization for writes and queriesINFLUXDB_V2_TOKEN
- the token to use for the authorizationINFLUXDB_V2_TIMEOUT
- socket timeout in ms (default value is 10000)INFLUXDB_V2_VERIFY_SSL
- set this to false to skip verifying SSL certificate when calling API from https serverINFLUXDB_V2_SSL_CA_CERT
- set this to customize the certificate file to verify the peerINFLUXDB_V2_CERT_FILE
- path to the certificate that will be used for mTLS authenticationINFLUXDB_V2_CERT_KEY_FILE
- path to the file contains private key for mTLS certificateINFLUXDB_V2_CERT_KEY_PASSWORD
- string or function which returns password for decrypting the mTLS private keyINFLUXDB_V2_CONNECTION_POOL_MAXSIZE
- set the number of connections to save that can be reused by urllib3INFLUXDB_V2_AUTH_BASIC
- enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)INFLUXDB_V2_PROFILERS
- set the list of enabled Flux profilers
self.client = InfluxDBClient.from_env_properties()
Profile query
The Flux Profiler package provides performance profiling tools for Flux queries and operations.
You can enable printing profiler information of the Flux query in client library by:
set QueryOptions.profilers in QueryApi,
set
INFLUXDB_V2_PROFILERS
environment variable,set
profilers
option in configuration file.
When the profiler is enabled, the result of flux query contains additional tables “profiler/*”.
In order to have consistent behaviour with enabled/disabled profiler, FluxCSVParser
excludes “profiler/*” measurements
from result.
Example how to enable profilers using API:
q = '''
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
"stringParam": "my-bucket",
}
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)
Example of a profiler output:
===============
Profiler: query
===============
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
========================
Profiler: profiler/query
========================
result : _profiler
table : 0
_measurement : profiler/query
TotalDuration : 8924700
CompileDuration : 350900
QueueDuration : 33800
PlanDuration : 0
RequeueDuration : 0
ExecuteDuration : 8486500
Concurrency : 0
MaxAllocated : 2072
TotalAllocated : 0
flux/query-plan :
digraph {
ReadWindowAggregateByTime11
// every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
pivot8
generated_yield
ReadWindowAggregateByTime11 -> pivot8
pivot8 -> generated_yield
}
influxdb/scanned-bytes: 0
influxdb/scanned-values: 0
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *universe.pivotTransformation
Label : pivot8
Count : 3
MinDuration : 32600
MaxDuration : 126200
DurationSum : 193400
MeanDuration : 64466.666666666664
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *influxdb.readWindowAggregateSource
Label : ReadWindowAggregateByTime11
Count : 1
MinDuration : 940500
MaxDuration : 940500
DurationSum : 940500
MeanDuration : 940500.0
You can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.
Example how to use profilers with callback:
class ProfilersCallback(object):
def __init__(self):
self.records = []
def __call__(self, flux_record):
self.records.append(flux_record.values)
callback = ProfilersCallback()
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
Example output of this callback:
Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}