API Reference
InfluxDBClient
- class influxdb_client.InfluxDBClient(url, token: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, org: Optional[str] = None, default_tags: Optional[dict] = None, **kwargs)[source]
InfluxDBClient is client for InfluxDB v2.
Initialize defaults.
- Parameters
url – InfluxDB server API url (ex. http://localhost:8086).
token –
token
to authenticate to the InfluxDB APIdebug – enable verbose logging of http requests
timeout – HTTP client timeout setting for a request specified in milliseconds. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts.
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
org – organization name (used as a default in Query, Write and Delete API)
- Key bool verify_ssl
Set this to false to skip verifying SSL certificate when calling API from https server.
- Key str ssl_ca_cert
Set this to customize the certificate file to verify the peer.
- Key str cert_file
Path to the certificate that will be used for mTLS authentication.
- Key str cert_key_file
Path to the file contains private key for mTLS certificate.
- Key str cert_key_password
String or function which returns password for decrypting the mTLS private key.
- Key ssl.SSLContext ssl_context
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Key str proxy
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key int connection_pool_maxsize
Number of connections to save that can be reused by urllib3. Defaults to “multiprocessing.cpu_count() * 5”.
- Key urllib3.util.retry.Retry retries
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key bool auth_basic
Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
- Key str username
username
to authenticate via username and password credentials to the InfluxDB 2.x- Key str password
password
to authenticate via username and password credentials to the InfluxDB 2.x- Key list[str] profilers
list of enabled Flux profilers
- authorizations_api() AuthorizationsApi [source]
Create the Authorizations API instance.
- Returns
authorizations api
- buckets_api() BucketsApi [source]
Create the Bucket API instance.
- Returns
buckets api
- build() str [source]
Return the build type of the connected InfluxDB Server.
- Returns
The type of InfluxDB build.
- classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]
Configure client via configuration file. The configuration has to be under ‘influx’ section.
- Parameters
config_file – Path to configuration file
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key config_name
Name of the configuration section of the configuration file
- Key str proxy_headers
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
url
org
token
timeout,
verify_ssl
ssl_ca_cert
cert_file
cert_key_file
cert_key_password
connection_pool_maxsize
auth_basic
profilers
proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}
config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
- classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]
Configure client via environment properties.
- Parameters
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key str proxy
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
INFLUXDB_V2_URL
INFLUXDB_V2_ORG
INFLUXDB_V2_TOKEN
INFLUXDB_V2_TIMEOUT
INFLUXDB_V2_VERIFY_SSL
INFLUXDB_V2_SSL_CA_CERT
INFLUXDB_V2_CERT_FILE
INFLUXDB_V2_CERT_KEY_FILE
INFLUXDB_V2_CERT_KEY_PASSWORD
INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
INFLUXDB_V2_AUTH_BASIC
INFLUXDB_V2_PROFILERS
INFLUXDB_V2_TAG
- invokable_scripts_api() InvokableScriptsApi [source]
Create an InvokableScripts API instance.
- Returns
InvokableScripts API instance
- organizations_api() OrganizationsApi [source]
Create the Organizations API instance.
- Returns
organizations api
- query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApi [source]
Create an Query API instance.
- Parameters
query_options – optional query api configuration
- Returns
Query api instance
- version() str [source]
Return the version of the connected InfluxDB Server.
- Returns
The version of InfluxDB.
- write_api(write_options=<influxdb_client.client.write_api.WriteOptions object>, point_settings=<influxdb_client.client.write_api.PointSettings object>, **kwargs) WriteApi [source]
Create Write API instance.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
If you would like to use a background batching, you have to configure client like this:
from influxdb_client import InfluxDBClient # Initialize background batching instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: with client.write_api() as write_api: pass
There is also possibility to use callbacks to notify about state of background batches:
from influxdb_client import InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: callback = BatchingCallback() with client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as write_api: pass
- Parameters
write_options – Write API configuration
point_settings – settings to store default tags
- Key success_callback
The callable
callback
to run after successfully writen a batch.- The callable must accept two arguments:
Tuple:
(bucket, organization, precision)
str: written data
[batching mode]
- Key error_callback
The callable
callback
to run after unsuccessfully writen a batch.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an occurred error
[batching mode]
- Key retry_callback
The callable
callback
to run after retryable error occurred.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an retryable error
[batching mode]
- Returns
write api instance
QueryApi
- class influxdb_client.QueryApi(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]
Implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
- Parameters
influxdb_client – influxdb client
- query(query: str, org=None, params: Optional[dict] = None) TableList [source]
Execute synchronous Flux query and return result as a
FluxTable
list.- Parameters
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.params – bind parameters
- Returns
- Return type
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- query_csv(query: str, org=None, dialect: Dialect = {'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None) CSVIterator [source]
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
- Parameters
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.dialect – csv dialect format
params – bind parameters
- Returns
Iterator[List[str]]
wrapped intoCSVIterator
- Return type
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = csv_iterator.to_values() print(output)
[ ['#datatype', 'string', 'long', 'dateTime:RFC3339', 'dateTime:RFC3339', 'dateTime:RFC3339', 'double', 'string', 'string', 'string'] ['#group', 'false', 'false', 'true', 'true', 'false', 'false', 'true', 'true', 'true'] ['#default', '_result', '', '', '', '', '', '', '', ''] ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
If you would like to turn off Annotated CSV header’s you can use following code:
from influxdb_client import InfluxDBClient, Dialect with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)', dialect=Dialect(header=False, annotations=[])) for csv_line in csv_iterator: print(csv_line)
[ ['', '_result', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '_result', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
- query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return Pandas DataFrame.
Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return stream of Pandas DataFrame as a
Generator[DataFrame]
.Note
If the
query
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns
Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return result as raw unprocessed result as a str.
- Parameters
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.dialect – csv dialect format
params – bind parameters
- Returns
str
- query_stream(query: str, org=None, params: Optional[dict] = None) Generator[FluxRecord, Any, None] [source]
Execute synchronous Flux query and return stream of FluxRecord as a Generator[‘FluxRecord’].
- Parameters
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.params – bind parameters
- Returns
Generator[‘FluxRecord’]
- class influxdb_client.client.flux_table.FluxTable[source]
A table is set of records with a common set of columns and a group key.
The table can be serialized into JSON by:
import json from influxdb_client.client.flux_table import FluxStructureEncoder output = json.dumps(tables, cls=FluxStructureEncoder, indent=2) print(output)
Initialize defaults.
- class influxdb_client.client.flux_table.FluxRecord(table, values=None)[source]
A record is a tuple of named values and is represented using an object type.
Initialize defaults.
- class influxdb_client.client.flux_table.TableList(iterable=(), /)[source]
FluxTable
list with additionally functional to better handle of query result.- to_json(columns: Optional[List[str]] = None, **kwargs) str [source]
Serialize query results to a JSON formatted
str
.- Parameters
columns – if not
None
then only specified columns are presented in results- Returns
The query results is flattened to array:
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
The JSON format could be configured via
**kwargs
arguments:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
For all available options see - json.dump.
- to_values(columns: Optional[List[str]] = None) List[List[object]] [source]
Serialize query results to a flattened list of values.
- Parameters
columns – if not
None
then only specified columns are presented in results- Returns
list
of values
Output example:
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Configure required columns:
from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
- class influxdb_client.client.flux_table.CSVIterator(response: HTTPResponse)[source]
Iterator[List[str]]
with additionally functional to better handle of query result.Initialize
csv.reader
.
WriteApi
- class influxdb_client.WriteApi(influxdb_client, write_options: ~influxdb_client.client.write_api.WriteOptions = <influxdb_client.client.write_api.WriteOptions object>, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>, **kwargs)[source]
Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
Initialize defaults.
- Parameters
influxdb_client – with default settings (organization)
write_options – write api configuration
point_settings – settings to store default tags.
- Key success_callback
The callable
callback
to run after successfully writen a batch.- The callable must accept two arguments:
Tuple:
(bucket, organization, precision)
str: written data
[batching mode]
- Key error_callback
The callable
callback
to run after unsuccessfully writen a batch.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an occurred error
[batching mode]
- Key retry_callback
The callable
callback
to run after retryable error occurred.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)
str: written data
Exception: an retryable error
[batching mode]
- write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], Observable, NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) Any [source]
Write time-series data into InfluxDB.
- Parameters
bucket (str) – specifies the destination bucket for writes (required)
org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from
InfluxDBClient.org
is used.write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or RxPY Observable to write
- Key data_frame_measurement_name
name of measurement for writing Pandas DataFrame -
DataFrame
- Key data_frame_tag_columns
list of DataFrame columns which are tags, rest columns will be fields -
DataFrame
- Key data_frame_timestamp_column
name of DataFrame column which contains a timestamp. The column can be defined as a
str
value formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame
- Key data_frame_timestamp_timezone
name of the timezone which is used for timestamp column -
DataFrame
- Key record_measurement_key
key of record with specified measurement -
dictionary
,NamedTuple
,dataclass
- Key record_measurement_name
static measurement name -
dictionary
,NamedTuple
,dataclass
- Key record_time_key
key of record with specified timestamp -
dictionary
,NamedTuple
,dataclass
- Key record_tag_keys
list of record keys to use as a tag -
dictionary
,NamedTuple
,dataclass
- Key record_field_keys
list of record keys to use as a field -
dictionary
,NamedTuple
,dataclass
- Example:
# Record as Line Protocol write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_column
is not specified the index of Pandas DataFrame is used as atimestamp
for written data. The index can be PeriodIndex or its must be transformable todatetime
by pandas.to_datetime.If you would like to transform a column to
PeriodIndex
, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
- class influxdb_client.client.write.point.Point(measurement_name)[source]
Point defines the values that will be written to the database.
Ref: https://docs.influxdata.com/influxdb/latest/reference/key-concepts/data-elements/#point
Initialize defaults.
- static from_dict(dictionary: dict, write_precision: WritePrecision = 'ns', **kwargs)[source]
Initialize point from ‘dict’ structure.
- The expected dict structure is:
measurement
tags
fields
time
- Example:
# Use default dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1.0}, "time": 1 } point = Point.from_dict(dict_structure, WritePrecision.NS)
- Example:
# Use custom dictionary structure dictionary = { "name": "sensor_pt859", "location": "warehouse_125", "version": "2021.06.05.5874", "pressure": 125, "temperature": 10, "created": 1632208639, } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, record_measurement_key="name", record_time_key="created", record_tag_keys=["location", "version"], record_field_keys=["pressure", "temperature"])
- Int Types:
The following example shows how to configure the types of integers fields. It is useful when you want to serialize integers always as
float
to avoidfield type conflict
or useunsigned 64-bit integer
as the type for serialization.# Use custom dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": { "water_level": 1.0, "some_counter": 108913123234 }, "time": 1 } point = Point.from_dict(dict_structure, field_types={"some_counter": "uint"})
- Parameters
dictionary – dictionary for serialize into data Point
write_precision – sets the precision for the supplied time values
- Key record_measurement_key
key of dictionary with specified measurement
- Key record_measurement_name
static measurement name for data Point
- Key record_time_key
key of dictionary with specified timestamp
- Key record_tag_keys
list of dictionary keys to use as a tag
- Key record_field_keys
list of dictionary keys to use as a field
- Key field_types
optional dictionary to specify types of serialized fields. Currently, is supported customization for integer types. Possible integers types:
int
- serialize integers as “Signed 64-bit integers” -9223372036854775807i
(default behaviour)uint
- serialize integers as “Unsigned 64-bit integers” -9223372036854775807u
float
- serialize integers as “IEEE-754 64-bit floating-point numbers”. Useful for unify number types in your pipeline to avoid field type conflict -9223372036854775807
The
field_types
can be also specified as part of incoming dictionary. For more info see an example above.- Returns
new data point
- time(time, write_precision='ns')[source]
Specify timestamp for DataPoint with declared precision.
If time doesn’t have specified timezone we assume that timezone is UTC.
- Examples::
Point.measurement(“h2o”).field(“val”, 1).time(“2009-11-10T23:00:00.123456Z”) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000) Point.measurement(“h2o”).field(“val”, 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000, write_precision=WritePrecision.NS)
- Parameters
time – the timestamp for your data
write_precision – sets the precision for the supplied time values
- Returns
this point
- to_line_protocol(precision=None)[source]
Create LineProtocol.
- param precision
required precision of LineProtocol. If it’s not set then use the precision from
Point
.
- property write_precision
Get precision.
- class influxdb_client.domain.write_precision.WritePrecision[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
WritePrecision - a model defined in OpenAPI.
- NS = 'ns'
- Attributes:
- openapi_types (dict): The key is attribute name
and the value is attribute type.
- attribute_map (dict): The key is attribute name
and the value is json key in definition.
BucketsApi
- class influxdb_client.BucketsApi(influxdb_client)[source]
Implementation for ‘/api/v2/buckets’ endpoint.
Initialize defaults.
- create_bucket(bucket=None, bucket_name=None, org_id=None, retention_rules=None, description=None, org=None) Bucket [source]
Create a bucket.
- Parameters
bucket (Bucket|PostBucketRequest) – bucket to create
bucket_name – bucket name
description – bucket description
org_id – org_id
bucket_name – bucket name
retention_rules – retention rules array or single BucketRetentionRules
org (str, Organization) – specifies the organization for create the bucket; Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.
- Returns
Bucket If the method is called asynchronously, returns the request thread.
- delete_bucket(bucket)[source]
Delete a bucket.
- Parameters
bucket – bucket id or Bucket
- Returns
Bucket
- find_bucket_by_name(bucket_name)[source]
Find bucket by name.
- Parameters
bucket_name – bucket name
- Returns
Bucket
- find_buckets(**kwargs)[source]
List buckets.
- Key int offset
Offset for pagination
- Key int limit
Limit for pagination
- Key str after
The last resource ID from which to seek from (but not including). This is to be used instead of offset.
- Key str org
The organization name.
- Key str org_id
The organization ID.
- Key str name
Only returns buckets with a specific name.
- Returns
Buckets
- class influxdb_client.domain.Bucket(links=None, id=None, type='user', name=None, description=None, org_id=None, rp=None, schema_type=None, created_at=None, updated_at=None, retention_rules=None, labels=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Bucket - a model defined in OpenAPI.
- property created_at
Get the created_at of this Bucket.
- Returns
The created_at of this Bucket.
- Return type
datetime
- property description
Get the description of this Bucket.
- Returns
The description of this Bucket.
- Return type
- property labels
Get the labels of this Bucket.
- Returns
The labels of this Bucket.
- Return type
list[Label]
- property links
Get the links of this Bucket.
- Returns
The links of this Bucket.
- Return type
BucketLinks
- property retention_rules
Get the retention_rules of this Bucket.
Retention rules to expire or retain data. The InfluxDB /api/v2 API uses RetentionRules to configure the [retention period](https://docs.influxdata.com/influxdb/latest/reference/glossary/#retention-period). #### InfluxDB Cloud - retentionRules is required. #### InfluxDB OSS - retentionRules isn’t required.
- Returns
The retention_rules of this Bucket.
- Return type
list[BucketRetentionRules]
- property schema_type
Get the schema_type of this Bucket.
- Returns
The schema_type of this Bucket.
- Return type
SchemaType
- property updated_at
Get the updated_at of this Bucket.
- Returns
The updated_at of this Bucket.
- Return type
datetime
LabelsApi
- class influxdb_client.LabelsApi(influxdb_client)[source]
Implementation for ‘/api/v2/labels’ endpoint.
Initialize defaults.
- clone_label(cloned_name: str, label: Label) Label [source]
Create the new instance of the label as a copy existing label.
- Parameters
cloned_name – new label name
label – existing label
- Returns
clonned Label
- create_label(name: str, org_id: str, properties: Optional[Dict[str, str]] = None) Label [source]
Create a new label.
- Parameters
name – label name
org_id – organization id
properties – optional label properties
- Returns
created label
- delete_label(label: Union[str, Label])[source]
Delete the label.
- Parameters
label – label id or Label
- find_label_by_id(label_id: str)[source]
Retrieve the label by id.
- Parameters
label_id –
- Returns
Label
- find_label_by_org(org_id) List[Label] [source]
Get the list of all labels for given organization.
- Parameters
org_id – organization id
- Returns
list of labels
OrganizationsApi
- class influxdb_client.OrganizationsApi(influxdb_client)[source]
Implementation for ‘/api/v2/orgs’ endpoint.
Initialize defaults.
- create_organization(name: Optional[str] = None, organization: Optional[Organization] = None) Organization [source]
Create an organization.
- find_organizations(**kwargs)[source]
List all organizations.
- Key int offset
Offset for pagination
- Key int limit
Limit for pagination
- Key bool descending
- Key str org
Filter organizations to a specific organization name.
- Key str org_id
Filter organizations to a specific organization ID.
- Key str user_id
Filter organizations to a specific user ID.
- update_organization(organization: Organization) Organization [source]
Update an organization.
- Parameters
organization – Organization update to apply (required)
- Returns
Organization
- class influxdb_client.domain.Organization(links=None, id=None, name=None, default_storage_type=None, description=None, created_at=None, updated_at=None, status='active')[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Organization - a model defined in OpenAPI.
- property created_at
Get the created_at of this Organization.
- Returns
The created_at of this Organization.
- Return type
datetime
- property default_storage_type
Get the default_storage_type of this Organization.
Discloses whether the organization uses TSM or IOx.
- Returns
The default_storage_type of this Organization.
- Return type
- property description
Get the description of this Organization.
- Returns
The description of this Organization.
- Return type
- property links
Get the links of this Organization.
- Returns
The links of this Organization.
- Return type
OrganizationLinks
- property name
Get the name of this Organization.
- Returns
The name of this Organization.
- Return type
- property status
Get the status of this Organization.
If inactive, the organization is inactive.
- Returns
The status of this Organization.
- Return type
- property updated_at
Get the updated_at of this Organization.
- Returns
The updated_at of this Organization.
- Return type
datetime
UsersApi
- class influxdb_client.UsersApi(influxdb_client)[source]
Implementation for ‘/api/v2/users’ endpoint.
Initialize defaults.
- delete_user(user: Union[str, User, UserResponse]) None [source]
Delete a user.
- Parameters
user – user id or User
- Returns
None
- find_users(**kwargs) Users [source]
List all users.
- Key int offset
The offset for pagination. The number of records to skip.
- Key int limit
Limits the number of records returned. Default is 20.
- Key str after
The last resource ID from which to seek from (but not including). This is to be used instead of offset.
- Key str name
The user name.
- Key str id
The user ID.
- Returns
Buckets
- class influxdb_client.domain.User(id=None, name=None, status='active')[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
User - a model defined in OpenAPI.
- property name
Get the name of this User.
The user name.
- Returns
The name of this User.
- Return type
- property status
Get the status of this User.
If inactive, the user is inactive. Default is active.
- Returns
The status of this User.
- Return type
TasksApi
- class influxdb_client.TasksApi(influxdb_client)[source]
Implementation for ‘/api/v2/tasks’ endpoint.
Initialize defaults.
- cancel_run(task_id: str, run_id: str)[source]
Cancel a currently running run.
- Parameters
task_id –
run_id –
- create_task(task: Optional[Task] = None, task_create_request: Optional[TaskCreateRequest] = None) Task [source]
Create a new task.
- create_task_cron(name: str, flux: str, cron: str, org_id: str) Task [source]
Create a new task with cron repetition schedule.
- create_task_every(name, flux, every, organization) Task [source]
Create a new task with every repetition schedule.
- find_tasks(**kwargs)[source]
List all tasks.
- Key str name
only returns tasks with the specified name
- Key str after
returns tasks after specified ID
- Key str user
filter tasks to a specific user ID
- Key str org
filter tasks to a specific organization name
- Key str org_id
filter tasks to a specific organization ID
- Key int limit
the number of tasks to return
- Returns
Tasks
- get_logs(task_id: str) List[LogEvent] [source]
Retrieve all logs for a task.
- Parameters
task_id – task id
- get_run(task_id: str, run_id: str) Run [source]
Get run record for specific task and run id.
- Parameters
task_id – task id
run_id – run id
- Returns
Run for specified task and run id
- get_runs(task_id, **kwargs) List[Run] [source]
Retrieve list of run records for a task.
- Parameters
task_id – task id
- Key str after
returns runs after specified ID
- Key int limit
the number of runs to return
- Key datetime after_time
filter runs to those scheduled after this time, RFC3339
- Key datetime before_time
filter runs to those scheduled before this time, RFC3339
- retry_run(task_id: str, run_id: str)[source]
Retry a task run.
- Parameters
task_id – task id
run_id – run id
- class influxdb_client.domain.Task(id=None, org_id=None, org=None, name=None, owner_id=None, description=None, status=None, labels=None, authorization_id=None, flux=None, every=None, cron=None, offset=None, latest_completed=None, last_run_status=None, last_run_error=None, created_at=None, updated_at=None, links=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Task - a model defined in OpenAPI.
- property authorization_id
Get the authorization_id of this Task.
An authorization ID. Specifies the authorization used when the task communicates with the query engine. To find an authorization ID, use the [GET /api/v2/authorizations endpoint](#operation/GetAuthorizations) to list authorizations.
- Returns
The authorization_id of this Task.
- Return type
- property created_at
Get the created_at of this Task.
- Returns
The created_at of this Task.
- Return type
datetime
- property cron
Get the cron of this Task.
A [Cron expression](https://en.wikipedia.org/wiki/Cron#Overview) that defines the schedule on which the task runs. InfluxDB uses the system time when evaluating Cron expressions.
- Returns
The cron of this Task.
- Return type
- property description
Get the description of this Task.
A description of the task.
- Returns
The description of this Task.
- Return type
- property every
Get the every of this Task.
The interval ([duration literal](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)) at which the task runs. every also determines when the task first runs, depending on the specified time.
- Returns
The every of this Task.
- Return type
- property flux
Get the flux of this Task.
The Flux script that the task executes.
- Returns
The flux of this Task.
- Return type
- property labels
Get the labels of this Task.
- Returns
The labels of this Task.
- Return type
list[Label]
- property last_run_error
Get the last_run_error of this Task.
- Returns
The last_run_error of this Task.
- Return type
- property last_run_status
Get the last_run_status of this Task.
- Returns
The last_run_status of this Task.
- Return type
- property latest_completed
Get the latest_completed of this Task.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)) of the latest scheduled and completed run.
- Returns
The latest_completed of this Task.
- Return type
datetime
- property links
Get the links of this Task.
- Returns
The links of this Task.
- Return type
TaskLinks
- property name
Get the name of this Task.
The name of the task.
- Returns
The name of this Task.
- Return type
- property offset
Get the offset of this Task.
A [duration](https://docs.influxdata.com/flux/v0.x/spec/lexical-elements/#duration-literals) to delay execution of the task after the scheduled time has elapsed. 0 removes the offset.
- Returns
The offset of this Task.
- Return type
- property org
Get the org of this Task.
An [organization](https://docs.influxdata.com/influxdb/latest/reference/glossary/#organization) name. Specifies the organization that owns the task.
- Returns
The org of this Task.
- Return type
- property org_id
Get the org_id of this Task.
An [organization](https://docs.influxdata.com/influxdb/latest/reference/glossary/#organization) ID. Specifies the organization that owns the task.
- Returns
The org_id of this Task.
- Return type
- property owner_id
Get the owner_id of this Task.
A [user](https://docs.influxdata.com/influxdb/latest/reference/glossary/#user) ID. Specifies the owner of the task. To find a user ID, you can use the [GET /api/v2/users endpoint](#operation/GetUsers) to list users.
- Returns
The owner_id of this Task.
- Return type
- property status
Get the status of this Task.
- Returns
The status of this Task.
- Return type
TaskStatusType
- property updated_at
Get the updated_at of this Task.
- Returns
The updated_at of this Task.
- Return type
datetime
InvokableScriptsApi
- class influxdb_client.InvokableScriptsApi(influxdb_client)[source]
Use API invokable scripts to create custom InfluxDB API endpoints that query, process, and shape data.
Initialize defaults.
- create_script(create_request: ScriptCreateRequest) Script [source]
Create a script.
- Parameters
create_request (ScriptCreateRequest) – The script to create. (required)
- Returns
The created script.
- delete_script(script_id: str) None [source]
Delete a script.
- Parameters
script_id (str) – The ID of the script to delete. (required)
- Returns
None
- invoke_script(script_id: str, params: Optional[dict] = None) TableList [source]
Invoke synchronously a script and return result as a TableList.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns
- Return type
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- invoke_script_csv(script_id: str, params: Optional[dict] = None) CSVIterator [source]
Invoke synchronously a script and return result as a CSV iterator. Each iteration returns a row of the CSV file.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns
Iterator[List[str]]
wrapped intoCSVIterator
- Return type
Serialization the query results to flattened list of values via
to_values()
:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.invokable_scripts_api().invoke_script_csv(script_id="script-id") # Serialize to values output = csv_iterator.to_values() print(output)
[ ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
- invoke_script_data_frame(script_id: str, params: Optional[dict] = None, data_frame_index: Optional[List[str]] = None)[source]
Invoke synchronously a script and return Pandas DataFrame.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
script
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters
- Returns
DataFrame
orList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- invoke_script_data_frame_stream(script_id: str, params: Optional[dict] = None, data_frame_index: Optional[List[str]] = None)[source]
Invoke synchronously a script and return stream of Pandas DataFrame as a Generator[‘pd.DataFrame’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
script
returns tables with differing schemas than the client generates aDataFrame
for each of them.- Parameters
- Returns
Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() function
which align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- invoke_script_raw(script_id: str, params: Optional[dict] = None) Iterator[List[str]] [source]
Invoke synchronously a script and return result as raw unprocessed result as a str.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns
Result as a str.
- invoke_script_stream(script_id: str, params: Optional[dict] = None) Generator[FluxRecord, Any, None] [source]
Invoke synchronously a script and return result as a Generator[‘FluxRecord’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns
Stream of FluxRecord.
- Return type
Generator[‘FluxRecord’]
- class influxdb_client.domain.Script(id=None, name=None, description=None, org_id=None, script=None, language=None, url=None, created_at=None, updated_at=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Script - a model defined in OpenAPI.
- property created_at
Get the created_at of this Script.
- Returns
The created_at of this Script.
- Return type
datetime
- property description
Get the description of this Script.
- Returns
The description of this Script.
- Return type
- property language
Get the language of this Script.
- Returns
The language of this Script.
- Return type
ScriptLanguage
- property script
Get the script of this Script.
The script to execute.
- Returns
The script of this Script.
- Return type
- property updated_at
Get the updated_at of this Script.
- Returns
The updated_at of this Script.
- Return type
datetime
- class influxdb_client.domain.ScriptCreateRequest(name=None, description=None, script=None, language=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
ScriptCreateRequest - a model defined in OpenAPI.
- property description
Get the description of this ScriptCreateRequest.
Script description. A description of the script.
- Returns
The description of this ScriptCreateRequest.
- Return type
- property language
Get the language of this ScriptCreateRequest.
- Returns
The language of this ScriptCreateRequest.
- Return type
ScriptLanguage
- property name
Get the name of this ScriptCreateRequest.
Script name. The name must be unique within the organization.
- Returns
The name of this ScriptCreateRequest.
- Return type
- property script
Get the script of this ScriptCreateRequest.
The script to execute.
- Returns
The script of this ScriptCreateRequest.
- Return type
DeleteApi
- class influxdb_client.DeleteApi(influxdb_client)[source]
Implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
- delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) None [source]
Delete Time series data from InfluxDB.
- Parameters
start (str, datetime.datetime) – start time
stop (str, datetime.datetime) – stop time
predicate (str) – predicate
bucket (str) – bucket id or name from which data will be deleted
org (str, Organization) – specifies the organization to delete data from. Take the
ID
,Name
orOrganization
. If not specified the default value fromInfluxDBClient.org
is used.
- Returns
- class influxdb_client.domain.DeletePredicateRequest(start=None, stop=None, predicate=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
DeletePredicateRequest - a model defined in OpenAPI.
- property predicate
Get the predicate of this DeletePredicateRequest.
An expression in [delete predicate syntax](https://docs.influxdata.com/influxdb/latest/reference/syntax/delete-predicate/).
- Returns
The predicate of this DeletePredicateRequest.
- Return type
- property start
Get the start of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)). The earliest time to delete from.
- Returns
The start of this DeletePredicateRequest.
- Return type
datetime
- property stop
Get the stop of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)). The latest time to delete from.
- Returns
The stop of this DeletePredicateRequest.
- Return type
datetime
Helpers
- class influxdb_client.client.util.date_utils.DateHelper(timezone: tzinfo = datetime.timezone.utc)[source]
DateHelper to groups different implementations of date operations.
If you would like to serialize the query results to custom timezone, you can use following code:
from influxdb_client.client.util import date_utils from influxdb_client.client.util.date_utils import DateHelper import dateutil.parser from dateutil import tz def parse_date(date_string: str): return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2')) date_utils.date_helper = DateHelper() date_utils.date_helper.parse_date = parse_date
Initialize defaults.
- Parameters
timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”.
- parse_date(date_string: str)[source]
Parse string into Date or Timestamp.
- Returns
Returns a
datetime.datetime
object or compliant implementation likeclass 'pandas._libs.tslibs.timestamps.Timestamp
- to_nanoseconds(delta)[source]
Get number of nanoseconds in timedelta.
Solution comes from v1 client. Thx. https://github.com/influxdata/influxdb-python/pull/811
- class influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper(timezone: tzinfo = datetime.timezone.utc)[source]
DateHelper that use Pandas library with nanosecond precision.
Initialize defaults.
- Parameters
timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”.
- class influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter(**kwargs)[source]
The Helper class to write data into InfluxDB in independent OS process.
- Example:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) writer.start() for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") writer.__del__() if __name__ == '__main__': main()
- How to use with context_manager:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
- How to handle batch events:
from influxdb_client import WriteOptions from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") def main(): callback = BatchingCallback() with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
Initialize defaults.
For more information how to initialize the writer see the examples above.
- Parameters
kwargs – arguments are passed into
__init__
function ofInfluxDBClient
andwrite_api
.