API Reference
InfluxDBClient
- class influxdb_client.InfluxDBClient(url, token: Optional[str] = None, debug=None, timeout=10000, enable_gzip=False, org: Optional[str] = None, default_tags: Optional[dict] = None, **kwargs)[source]
InfluxDBClient is client for InfluxDB v2.
Initialize defaults.
- Parameters:
url – InfluxDB server API url (ex. http://localhost:8086).
token –
tokento authenticate to the InfluxDB APIdebug – enable verbose logging of http requests
timeout – HTTP client timeout setting for a request specified in milliseconds. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts.
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
org – organization name (used as a default in Query, Write and Delete API)
- Key bool verify_ssl:
Set this to false to skip verifying SSL certificate when calling API from https server.
- Key str ssl_ca_cert:
Set this to customize the certificate file to verify the peer.
- Key str cert_file:
Path to the certificate that will be used for mTLS authentication.
- Key str cert_key_file:
Path to the file contains private key for mTLS certificate.
- Key str cert_key_password:
String or function which returns password for decrypting the mTLS private key.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key int connection_pool_maxsize:
Number of connections to save that can be reused by urllib3. Defaults to “multiprocessing.cpu_count() * 5”.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key bool auth_basic:
Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don’t set to true when talking to InfluxDB 2)
- Key str username:
usernameto authenticate via username and password credentials to the InfluxDB 2.x- Key str password:
passwordto authenticate via username and password credentials to the InfluxDB 2.x- Key list[str] profilers:
list of enabled Flux profilers
- authorizations_api() AuthorizationsApi[source]
Create the Authorizations API instance.
- Returns:
authorizations api
- buckets_api() BucketsApi[source]
Create the Bucket API instance.
- Returns:
buckets api
- build() str[source]
Return the build type of the connected InfluxDB Server.
- Returns:
The type of InfluxDB build.
- classmethod from_config_file(config_file: str = 'config.ini', debug=None, enable_gzip=False, **kwargs)[source]
Configure client via configuration file. The configuration has to be under ‘influx’ section.
- Parameters:
config_file – Path to configuration file
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key config_name:
Name of the configuration section of the configuration file
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- The supported formats:
- Configuration options:
url
org
token
timeout,
verify_ssl
ssl_ca_cert
cert_file
cert_key_file
cert_key_password
connection_pool_maxsize
auth_basic
profilers
proxy
config.ini example:
[influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center}config.toml example:
[influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}"
config.json example:
{ "url": "http://localhost:8086", "token": "my-token", "org": "my-org", "active": true, "timeout": 6000, "connection_pool_maxsize": 55, "auth_basic": false, "profilers": "query, operator", "tags": { "id": "132-987-655", "customer": "California Miner", "data_center": "${env.data_center}" } }
- classmethod from_env_properties(debug=None, enable_gzip=False, **kwargs)[source]
Configure client via environment properties.
- Parameters:
debug – Enable verbose logging of http requests
enable_gzip – Enable Gzip compression for http requests. Currently, only the “Write” and “Query” endpoints supports the Gzip compression.
- Key str proxy:
Set this to configure the http proxy to be used (ex. http://localhost:3128)
- Key str proxy_headers:
A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
- Key urllib3.util.retry.Retry retries:
Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy.
- Key ssl.SSLContext ssl_context:
Specify a custom Python SSL Context for the TLS/ mTLS handshake. Be aware that only delivered certificate/ key files or an SSL Context are possible.
- Supported environment properties:
INFLUXDB_V2_URL
INFLUXDB_V2_ORG
INFLUXDB_V2_TOKEN
INFLUXDB_V2_TIMEOUT
INFLUXDB_V2_VERIFY_SSL
INFLUXDB_V2_SSL_CA_CERT
INFLUXDB_V2_CERT_FILE
INFLUXDB_V2_CERT_KEY_FILE
INFLUXDB_V2_CERT_KEY_PASSWORD
INFLUXDB_V2_CONNECTION_POOL_MAXSIZE
INFLUXDB_V2_AUTH_BASIC
INFLUXDB_V2_PROFILERS
INFLUXDB_V2_TAG
- invokable_scripts_api() InvokableScriptsApi[source]
Create an InvokableScripts API instance.
- Returns:
InvokableScripts API instance
- organizations_api() OrganizationsApi[source]
Create the Organizations API instance.
- Returns:
organizations api
- query_api(query_options: ~influxdb_client.client.query_api.QueryOptions = <influxdb_client.client.query_api.QueryOptions object>) QueryApi[source]
Create an Query API instance.
- Parameters:
query_options – optional query api configuration
- Returns:
Query api instance
- version() str[source]
Return the version of the connected InfluxDB Server.
- Returns:
The version of InfluxDB.
- write_api(write_options=<influxdb_client.client.write_api.WriteOptions object>, point_settings=<influxdb_client.client.write_api.PointSettings object>, **kwargs) WriteApi[source]
Create Write API instance.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
If you would like to use a background batching, you have to configure client like this:
from influxdb_client import InfluxDBClient # Initialize background batching instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: with client.write_api() as write_api: pass
There is also possibility to use callbacks to notify about state of background batches:
from influxdb_client import InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: callback = BatchingCallback() with client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as write_api: pass
- Parameters:
write_options – Write API configuration
point_settings – settings to store default tags
- Key success_callback:
The callable
callbackto run after successfully writen a batch.- The callable must accept two arguments:
Tuple:
(bucket, organization, precision)str: written data
[batching mode]
- Key error_callback:
The callable
callbackto run after unsuccessfully writen a batch.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)str: written data
Exception: an occurred error
[batching mode]
- Key retry_callback:
The callable
callbackto run after retryable error occurred.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)str: written data
Exception: an retryable error
[batching mode]
- Returns:
write api instance
QueryApi
- class influxdb_client.QueryApi(influxdb_client, query_options=<influxdb_client.client.query_api.QueryOptions object>)[source]
Implementation for ‘/api/v2/query’ endpoint.
Initialize query client.
- Parameters:
influxdb_client – influxdb client
- query(query: str, org=None, params: Optional[dict] = None) TableList[source]
Execute synchronous Flux query and return result as a
FluxTablelist.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values():from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json():from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- query_csv(query: str, org=None, dialect: Dialect = {'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None) CSVIterator[source]
Execute the Flux query and return results as a CSV iterator. Each iteration returns a row of the CSV file.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.dialect – csv dialect format
params – bind parameters
- Returns:
Iterator[List[str]]wrapped intoCSVIterator- Return type:
Serialization the query results to flattened list of values via
to_values():from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = csv_iterator.to_values() print(output)
[ ['#datatype', 'string', 'long', 'dateTime:RFC3339', 'dateTime:RFC3339', 'dateTime:RFC3339', 'double', 'string', 'string', 'string'] ['#group', 'false', 'false', 'true', 'true', 'false', 'false', 'true', 'true', 'true'] ['#default', '_result', '', '', '', '', '', '', '', ''] ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
If you would like to turn off Annotated CSV header’s you can use following code:
from influxdb_client import InfluxDBClient, Dialect with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.query_api().query_csv('from(bucket:"my-bucket") |> range(start: -10m)', dialect=Dialect(header=False, annotations=[])) for csv_line in csv_iterator: print(csv_line)
[ ['', '_result', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '_result', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
- query_data_frame(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return Pandas DataFrame.
Note
If the
queryreturns tables with differing schemas than the client generates aDataFramefor each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
DataFrameorList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() functionwhich align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- query_data_frame_stream(query: str, org=None, data_frame_index: Optional[List[str]] = None, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return stream of Pandas DataFrame as a
Generator[DataFrame].Note
If the
queryreturns tables with differing schemas than the client generates aDataFramefor each of them.- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.data_frame_index – the list of columns that are used as DataFrame index
params – bind parameters
- Returns:
Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() functionwhich align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- query_raw(query: str, org=None, dialect={'annotations': ['datatype', 'group', 'default'], 'comment_prefix': '#', 'date_time_format': 'RFC3339', 'delimiter': ',', 'header': True}, params: Optional[dict] = None)[source]
Execute synchronous Flux query and return result as raw unprocessed result as a str.
- Parameters:
query – a Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.dialect – csv dialect format
params – bind parameters
- Returns:
str
- query_stream(query: str, org=None, params: Optional[dict] = None) Generator[FluxRecord, Any, None][source]
Execute synchronous Flux query and return stream of FluxRecord as a Generator[‘FluxRecord’].
- Parameters:
query – the Flux query
org (str, Organization) – specifies the organization for executing the query; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.params – bind parameters
- Returns:
Generator[‘FluxRecord’]
- class influxdb_client.client.flux_table.FluxTable[source]
A table is set of records with a common set of columns and a group key.
The table can be serialized into JSON by:
import json from influxdb_client.client.flux_table import FluxStructureEncoder output = json.dumps(tables, cls=FluxStructureEncoder, indent=2) print(output)
Initialize defaults.
- class influxdb_client.client.flux_table.FluxRecord(table, values=None)[source]
A record is a tuple of named values and is represented using an object type.
Initialize defaults.
- class influxdb_client.client.flux_table.TableList(iterable=(), /)[source]
FluxTablelist with additionally functional to better handle of query result.- to_json(columns: Optional[List[str]] = None, **kwargs) str[source]
Serialize query results to a JSON formatted
str.- Parameters:
columns – if not
Nonethen only specified columns are presented in results- Returns:
The query results is flattened to array:
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
The JSON format could be configured via
**kwargsarguments:from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to JSON output = tables.to_json(indent=5) print(output)
For all available options see - json.dump.
- to_values(columns: Optional[List[str]] = None) List[List[object]][source]
Serialize query results to a flattened list of values.
- Parameters:
columns – if not
Nonethen only specified columns are presented in results- Returns:
listof values
Output example:
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Configure required columns:
from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.query_api().query('from(bucket:"my-bucket") |> range(start: -10m)') # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
- class influxdb_client.client.flux_table.CSVIterator(response: HTTPResponse)[source]
Iterator[List[str]]with additionally functional to better handle of query result.Initialize
csv.reader.
WriteApi
- class influxdb_client.WriteApi(influxdb_client, write_options: ~influxdb_client.client.write_api.WriteOptions = <influxdb_client.client.write_api.WriteOptions object>, point_settings: ~influxdb_client.client.write_api.PointSettings = <influxdb_client.client.write_api.PointSettings object>, **kwargs)[source]
Implementation for ‘/api/v2/write’ endpoint.
- Example:
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS)
Initialize defaults.
- Parameters:
influxdb_client – with default settings (organization)
write_options – write api configuration
point_settings – settings to store default tags.
- Key success_callback:
The callable
callbackto run after successfully writen a batch.- The callable must accept two arguments:
Tuple:
(bucket, organization, precision)str: written data
[batching mode]
- Key error_callback:
The callable
callbackto run after unsuccessfully writen a batch.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)str: written data
Exception: an occurred error
[batching mode]
- Key retry_callback:
The callable
callbackto run after retryable error occurred.- The callable must accept three arguments:
Tuple:
(bucket, organization, precision)str: written data
Exception: an retryable error
[batching mode]
- write(bucket: str, org: Optional[str] = None, record: Optional[Union[str, Iterable[str], Point, Iterable[Point], dict, Iterable[dict], bytes, Iterable[bytes], Observable, NamedTuple, Iterable[NamedTuple], dataclass, Iterable[dataclass]]] = None, write_precision: WritePrecision = 'ns', **kwargs) Any[source]
Write time-series data into InfluxDB.
- Parameters:
bucket (str) – specifies the destination bucket for writes (required)
org (str, Organization) – specifies the destination organization for writes; take the ID, Name or Organization. If not specified the default value from
InfluxDBClient.orgis used.write_precision (WritePrecision) – specifies the precision for the unix timestamps within the body line-protocol. The precision specified on a Point has precedes and is use for write.
record – Point, Line Protocol, Dictionary, NamedTuple, Data Classes, Pandas DataFrame or RxPY Observable to write
- Key data_frame_measurement_name:
name of measurement for writing Pandas DataFrame -
DataFrame- Key data_frame_tag_columns:
list of DataFrame columns which are tags, rest columns will be fields -
DataFrame- Key data_frame_timestamp_column:
name of DataFrame column which contains a timestamp. The column can be defined as a
strvalue formatted as 2018-10-26, 2018-10-26 12:00, 2018-10-26 12:00:00-05:00 or other formats and types supported by pandas.to_datetime -DataFrame- Key data_frame_timestamp_timezone:
name of the timezone which is used for timestamp column -
DataFrame- Key record_measurement_key:
key of record with specified measurement -
dictionary,NamedTuple,dataclass- Key record_measurement_name:
static measurement name -
dictionary,NamedTuple,dataclass- Key record_time_key:
key of record with specified timestamp -
dictionary,NamedTuple,dataclass- Key record_tag_keys:
list of record keys to use as a tag -
dictionary,NamedTuple,dataclass- Key record_field_keys:
list of record keys to use as a field -
dictionary,NamedTuple,dataclass
- Example:
# Record as Line Protocol write_api.write("my-bucket", "my-org", "h2o_feet,location=us-west level=125i 1") # Record as Dictionary dictionary = { "measurement": "h2o_feet", "tags": {"location": "us-west"}, "fields": {"level": 125}, "time": 1 } write_api.write("my-bucket", "my-org", dictionary) # Record as Point from influxdb_client import Point point = Point("h2o_feet").tag("location", "us-west").field("level", 125).time(1) write_api.write("my-bucket", "my-org", point)
- DataFrame:
If the
data_frame_timestamp_columnis not specified the index of Pandas DataFrame is used as atimestampfor written data. The index can be PeriodIndex or its must be transformable todatetimeby pandas.to_datetime.If you would like to transform a column to
PeriodIndex, you can use something like:import pandas as pd # DataFrame data_frame = ... # Set column as Index data_frame.set_index('column_name', inplace=True) # Transform index to PeriodIndex data_frame.index = pd.to_datetime(data_frame.index, unit='s')
- class influxdb_client.client.write.point.Point(measurement_name)[source]
Point defines the values that will be written to the database.
Ref: https://docs.influxdata.com/influxdb/latest/reference/key-concepts/data-elements/#point
Initialize defaults.
- static from_dict(dictionary: dict, write_precision: WritePrecision = 'ns', **kwargs)[source]
Initialize point from ‘dict’ structure.
- The expected dict structure is:
measurement
tags
fields
time
- Example:
# Use default dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1.0}, "time": 1 } point = Point.from_dict(dict_structure, WritePrecision.NS)
- Example:
# Use custom dictionary structure dictionary = { "name": "sensor_pt859", "location": "warehouse_125", "version": "2021.06.05.5874", "pressure": 125, "temperature": 10, "created": 1632208639, } point = Point.from_dict(dictionary, write_precision=WritePrecision.S, record_measurement_key="name", record_time_key="created", record_tag_keys=["location", "version"], record_field_keys=["pressure", "temperature"])
- Int Types:
The following example shows how to configure the types of integers fields. It is useful when you want to serialize integers always as
floatto avoidfield type conflictor useunsigned 64-bit integeras the type for serialization.# Use custom dictionary structure dict_structure = { "measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": { "water_level": 1.0, "some_counter": 108913123234 }, "time": 1 } point = Point.from_dict(dict_structure, field_types={"some_counter": "uint"})
- Parameters:
dictionary – dictionary for serialize into data Point
write_precision – sets the precision for the supplied time values
- Key record_measurement_key:
key of dictionary with specified measurement
- Key record_measurement_name:
static measurement name for data Point
- Key record_time_key:
key of dictionary with specified timestamp
- Key record_tag_keys:
list of dictionary keys to use as a tag
- Key record_field_keys:
list of dictionary keys to use as a field
- Key field_types:
optional dictionary to specify types of serialized fields. Currently, is supported customization for integer types. Possible integers types:
int- serialize integers as “Signed 64-bit integers” -9223372036854775807i(default behaviour)uint- serialize integers as “Unsigned 64-bit integers” -9223372036854775807ufloat- serialize integers as “IEEE-754 64-bit floating-point numbers”. Useful for unify number types in your pipeline to avoid field type conflict -9223372036854775807
The
field_typescan be also specified as part of incoming dictionary. For more info see an example above.- Returns:
new data point
- time(time, write_precision='ns')[source]
Specify timestamp for DataPoint with declared precision.
If time doesn’t have specified timezone we assume that timezone is UTC.
- Examples::
Point.measurement(“h2o”).field(“val”, 1).time(“2009-11-10T23:00:00.123456Z”) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000) Point.measurement(“h2o”).field(“val”, 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456)) Point.measurement(“h2o”).field(“val”, 1).time(1257894000123456000, write_precision=WritePrecision.NS)
- Parameters:
time – the timestamp for your data
write_precision – sets the precision for the supplied time values
- Returns:
this point
- to_line_protocol(precision=None)[source]
Create LineProtocol.
- param precision:
required precision of LineProtocol. If it’s not set then use the precision from
Point.
- property write_precision
Get precision.
- class influxdb_client.domain.write_precision.WritePrecision[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
WritePrecision - a model defined in OpenAPI.
- NS = 'ns'
- Attributes:
- openapi_types (dict): The key is attribute name
and the value is attribute type.
- attribute_map (dict): The key is attribute name
and the value is json key in definition.
BucketsApi
- class influxdb_client.BucketsApi(influxdb_client)[source]
Implementation for ‘/api/v2/buckets’ endpoint.
Initialize defaults.
- create_bucket(bucket=None, bucket_name=None, org_id=None, retention_rules=None, description=None, org=None) Bucket[source]
Create a bucket.
- Parameters:
bucket (Bucket|PostBucketRequest) – bucket to create
bucket_name – bucket name
description – bucket description
org_id – org_id
bucket_name – bucket name
retention_rules – retention rules array or single BucketRetentionRules
org (str, Organization) – specifies the organization for create the bucket; Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.
- Returns:
Bucket If the method is called asynchronously, returns the request thread.
- delete_bucket(bucket)[source]
Delete a bucket.
- Parameters:
bucket – bucket id or Bucket
- Returns:
Bucket
- find_bucket_by_name(bucket_name)[source]
Find bucket by name.
- Parameters:
bucket_name – bucket name
- Returns:
Bucket
- find_buckets(**kwargs)[source]
List buckets.
- Key int offset:
Offset for pagination
- Key int limit:
Limit for pagination
- Key str after:
The last resource ID from which to seek from (but not including). This is to be used instead of offset.
- Key str org:
The organization name.
- Key str org_id:
The organization ID.
- Key str name:
Only returns buckets with a specific name.
- Returns:
Buckets
- class influxdb_client.domain.Bucket(links=None, id=None, type='user', name=None, description=None, org_id=None, rp=None, schema_type=None, created_at=None, updated_at=None, retention_rules=None, labels=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Bucket - a model defined in OpenAPI.
- property created_at
Get the created_at of this Bucket.
- Returns:
The created_at of this Bucket.
- Return type:
datetime
- property description
Get the description of this Bucket.
- Returns:
The description of this Bucket.
- Return type:
- property labels
Get the labels of this Bucket.
- Returns:
The labels of this Bucket.
- Return type:
list[Label]
- property links
Get the links of this Bucket.
- Returns:
The links of this Bucket.
- Return type:
BucketLinks
- property org_id
Get the org_id of this Bucket.
- Returns:
The org_id of this Bucket.
- Return type:
- property retention_rules
Get the retention_rules of this Bucket.
Retention rules to expire or retain data. The InfluxDB /api/v2 API uses RetentionRules to configure the [retention period](https://docs.influxdata.com/influxdb/latest/reference/glossary/#retention-period). #### InfluxDB Cloud - retentionRules is required. #### InfluxDB OSS - retentionRules isn’t required.
- Returns:
The retention_rules of this Bucket.
- Return type:
list[BucketRetentionRules]
- property schema_type
Get the schema_type of this Bucket.
- Returns:
The schema_type of this Bucket.
- Return type:
SchemaType
- property updated_at
Get the updated_at of this Bucket.
- Returns:
The updated_at of this Bucket.
- Return type:
datetime
LabelsApi
- class influxdb_client.LabelsApi(influxdb_client)[source]
Implementation for ‘/api/v2/labels’ endpoint.
Initialize defaults.
- clone_label(cloned_name: str, label: Label) Label[source]
Create the new instance of the label as a copy existing label.
- Parameters:
cloned_name – new label name
label – existing label
- Returns:
clonned Label
- create_label(name: str, org_id: str, properties: Optional[Dict[str, str]] = None) Label[source]
Create a new label.
- Parameters:
name – label name
org_id – organization id
properties – optional label properties
- Returns:
created label
- delete_label(label: Union[str, Label])[source]
Delete the label.
- Parameters:
label – label id or Label
- find_label_by_id(label_id: str)[source]
Retrieve the label by id.
- Parameters:
label_id –
- Returns:
Label
- find_label_by_org(org_id) List[Label][source]
Get the list of all labels for given organization.
- Parameters:
org_id – organization id
- Returns:
list of labels
OrganizationsApi
- class influxdb_client.OrganizationsApi(influxdb_client)[source]
Implementation for ‘/api/v2/orgs’ endpoint.
Initialize defaults.
- create_organization(name: Optional[str] = None, organization: Optional[Organization] = None) Organization[source]
Create an organization.
- find_organizations(**kwargs)[source]
List all organizations.
- Key int offset:
Offset for pagination
- Key int limit:
Limit for pagination
- Key bool descending:
- Key str org:
Filter organizations to a specific organization name.
- Key str org_id:
Filter organizations to a specific organization ID.
- Key str user_id:
Filter organizations to a specific user ID.
- update_organization(organization: Organization) Organization[source]
Update an organization.
- Parameters:
organization – Organization update to apply (required)
- Returns:
Organization
- class influxdb_client.domain.Organization(links=None, id=None, name=None, default_storage_type=None, description=None, created_at=None, updated_at=None, status='active')[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Organization - a model defined in OpenAPI.
- property created_at
Get the created_at of this Organization.
- Returns:
The created_at of this Organization.
- Return type:
datetime
- property default_storage_type
Get the default_storage_type of this Organization.
Discloses whether the organization uses TSM or IOx.
- Returns:
The default_storage_type of this Organization.
- Return type:
- property description
Get the description of this Organization.
- Returns:
The description of this Organization.
- Return type:
- property id
Get the id of this Organization.
- Returns:
The id of this Organization.
- Return type:
- property links
Get the links of this Organization.
- Returns:
The links of this Organization.
- Return type:
OrganizationLinks
- property name
Get the name of this Organization.
- Returns:
The name of this Organization.
- Return type:
- property status
Get the status of this Organization.
If inactive, the organization is inactive.
- Returns:
The status of this Organization.
- Return type:
- property updated_at
Get the updated_at of this Organization.
- Returns:
The updated_at of this Organization.
- Return type:
datetime
UsersApi
- class influxdb_client.UsersApi(influxdb_client)[source]
Implementation for ‘/api/v2/users’ endpoint.
Initialize defaults.
- delete_user(user: Union[str, User, UserResponse]) None[source]
Delete a user.
- Parameters:
user – user id or User
- Returns:
None
- find_users(**kwargs) Users[source]
List all users.
- Key int offset:
The offset for pagination. The number of records to skip.
- Key int limit:
Limits the number of records returned. Default is 20.
- Key str after:
The last resource ID from which to seek from (but not including). This is to be used instead of offset.
- Key str name:
The user name.
- Key str id:
The user ID.
- Returns:
Buckets
- class influxdb_client.domain.User(id=None, name=None, status='active')[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
User - a model defined in OpenAPI.
- property name
Get the name of this User.
The user name.
- Returns:
The name of this User.
- Return type:
- property status
Get the status of this User.
If inactive, the user is inactive. Default is active.
- Returns:
The status of this User.
- Return type:
TasksApi
- class influxdb_client.TasksApi(influxdb_client)[source]
Implementation for ‘/api/v2/tasks’ endpoint.
Initialize defaults.
- cancel_run(task_id: str, run_id: str)[source]
Cancel a currently running run.
- Parameters:
task_id –
run_id –
- create_task(task: Optional[Task] = None, task_create_request: Optional[TaskCreateRequest] = None) Task[source]
Create a new task.
- create_task_cron(name: str, flux: str, cron: str, org_id: str) Task[source]
Create a new task with cron repetition schedule.
- create_task_every(name, flux, every, organization) Task[source]
Create a new task with every repetition schedule.
- find_tasks(**kwargs)[source]
List all tasks up to set limit (max 500).
- Key str name:
only returns tasks with the specified name
- Key str after:
returns tasks after specified ID
- Key str user:
filter tasks to a specific user ID
- Key str org:
filter tasks to a specific organization name
- Key str org_id:
filter tasks to a specific organization ID
- Key int limit:
the number of tasks to return
- Returns:
Tasks
- find_tasks_iter(**kwargs)[source]
Iterate over all tasks with pagination.
- Key str name:
only returns tasks with the specified name
- Key str after:
returns tasks after specified ID
- Key str user:
filter tasks to a specific user ID
- Key str org:
filter tasks to a specific organization name
- Key str org_id:
filter tasks to a specific organization ID
- Key int limit:
the number of tasks in one page
- Returns:
Tasks iterator
- get_logs(task_id: str) List[LogEvent][source]
Retrieve all logs for a task.
- Parameters:
task_id – task id
- get_run(task_id: str, run_id: str) Run[source]
Get run record for specific task and run id.
- Parameters:
task_id – task id
run_id – run id
- Returns:
Run for specified task and run id
- get_runs(task_id, **kwargs) List[Run][source]
Retrieve list of run records for a task.
- Parameters:
task_id – task id
- Key str after:
returns runs after specified ID
- Key int limit:
the number of runs to return
- Key datetime after_time:
filter runs to those scheduled after this time, RFC3339
- Key datetime before_time:
filter runs to those scheduled before this time, RFC3339
- retry_run(task_id: str, run_id: str)[source]
Retry a task run.
- Parameters:
task_id – task id
run_id – run id
- class influxdb_client.domain.Task(id=None, org_id=None, org=None, name=None, owner_id=None, description=None, status=None, labels=None, authorization_id=None, flux=None, every=None, cron=None, offset=None, latest_completed=None, last_run_status=None, last_run_error=None, created_at=None, updated_at=None, links=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Task - a model defined in OpenAPI.
- property authorization_id
Get the authorization_id of this Task.
An authorization ID. Specifies the authorization used when the task communicates with the query engine. To find an authorization ID, use the [GET /api/v2/authorizations endpoint](#operation/GetAuthorizations) to list authorizations.
- Returns:
The authorization_id of this Task.
- Return type:
- property created_at
Get the created_at of this Task.
- Returns:
The created_at of this Task.
- Return type:
datetime
- property cron
Get the cron of this Task.
A [Cron expression](https://en.wikipedia.org/wiki/Cron#Overview) that defines the schedule on which the task runs. InfluxDB uses the system time when evaluating Cron expressions.
- Returns:
The cron of this Task.
- Return type:
- property description
Get the description of this Task.
A description of the task.
- Returns:
The description of this Task.
- Return type:
- property every
Get the every of this Task.
The interval ([duration literal](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)) at which the task runs. every also determines when the task first runs, depending on the specified time.
- Returns:
The every of this Task.
- Return type:
- property flux
Get the flux of this Task.
The Flux script that the task executes.
- Returns:
The flux of this Task.
- Return type:
- property labels
Get the labels of this Task.
- Returns:
The labels of this Task.
- Return type:
list[Label]
- property last_run_error
Get the last_run_error of this Task.
- Returns:
The last_run_error of this Task.
- Return type:
- property last_run_status
Get the last_run_status of this Task.
- Returns:
The last_run_status of this Task.
- Return type:
- property latest_completed
Get the latest_completed of this Task.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)) of the latest scheduled and completed run.
- Returns:
The latest_completed of this Task.
- Return type:
datetime
- property links
Get the links of this Task.
- Returns:
The links of this Task.
- Return type:
TaskLinks
- property name
Get the name of this Task.
The name of the task.
- Returns:
The name of this Task.
- Return type:
- property offset
Get the offset of this Task.
A [duration](https://docs.influxdata.com/flux/v0.x/spec/lexical-elements/#duration-literals) to delay execution of the task after the scheduled time has elapsed. 0 removes the offset.
- Returns:
The offset of this Task.
- Return type:
- property org
Get the org of this Task.
An [organization](https://docs.influxdata.com/influxdb/latest/reference/glossary/#organization) name. Specifies the organization that owns the task.
- Returns:
The org of this Task.
- Return type:
- property org_id
Get the org_id of this Task.
An [organization](https://docs.influxdata.com/influxdb/latest/reference/glossary/#organization) ID. Specifies the organization that owns the task.
- Returns:
The org_id of this Task.
- Return type:
- property owner_id
Get the owner_id of this Task.
A [user](https://docs.influxdata.com/influxdb/latest/reference/glossary/#user) ID. Specifies the owner of the task. To find a user ID, you can use the [GET /api/v2/users endpoint](#operation/GetUsers) to list users.
- Returns:
The owner_id of this Task.
- Return type:
- property status
Get the status of this Task.
- Returns:
The status of this Task.
- Return type:
TaskStatusType
- property updated_at
Get the updated_at of this Task.
- Returns:
The updated_at of this Task.
- Return type:
datetime
InvokableScriptsApi
- class influxdb_client.InvokableScriptsApi(influxdb_client)[source]
Use API invokable scripts to create custom InfluxDB API endpoints that query, process, and shape data.
Initialize defaults.
- create_script(create_request: ScriptCreateRequest) Script[source]
Create a script.
- Parameters:
create_request (ScriptCreateRequest) – The script to create. (required)
- Returns:
The created script.
- delete_script(script_id: str) None[source]
Delete a script.
- Parameters:
script_id (str) – The ID of the script to delete. (required)
- Returns:
None
- invoke_script(script_id: str, params: Optional[dict] = None) TableList[source]
Invoke synchronously a script and return result as a TableList.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
- Return type:
Serialization the query results to flattened list of values via
to_values():from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to values output = tables.to_values(columns=['location', '_time', '_value']) print(output)
[ ['New York', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 24.3], ['Prague', datetime.datetime(2022, 6, 7, 11, 3, 22, 917593, tzinfo=tzutc()), 25.3], ... ]
Serialization the query results to JSON via
to_json():from influxdb_client import InfluxDBClient with InfluxDBClient(url="https://us-west-2-1.aws.cloud2.influxdata.com", token="my-token", org="my-org") as client: # Query: using Table structure tables = client.invokable_scripts_api().invoke_script(script_id="script-id") # Serialize to JSON output = tables.to_json(indent=5) print(output)
[ { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:00.897825+00:00", "region": "north", "_field": "usage", "_value": 15 }, { "_measurement": "mem", "_start": "2021-06-23T06:50:11.897825+00:00", "_stop": "2021-06-25T06:50:11.897825+00:00", "_time": "2020-02-27T16:20:01.897825+00:00", "region": "west", "_field": "usage", "_value": 10 }, ... ]
- invoke_script_csv(script_id: str, params: Optional[dict] = None) CSVIterator[source]
Invoke synchronously a script and return result as a CSV iterator. Each iteration returns a row of the CSV file.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
Iterator[List[str]]wrapped intoCSVIterator- Return type:
Serialization the query results to flattened list of values via
to_values():from influxdb_client import InfluxDBClient with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: # Query: using CSV iterator csv_iterator = client.invokable_scripts_api().invoke_script_csv(script_id="script-id") # Serialize to values output = csv_iterator.to_values() print(output)
[ ['', 'result', 'table', '_start', '_stop', '_time', '_value', '_field', '_measurement', 'location'] ['', '', '0', '2022-06-16', '2022-06-16', '2022-06-16', '24.3', 'temperature', 'my_measurement', 'New York'] ['', '', '1', '2022-06-16', '2022-06-16', '2022-06-16', '25.3', 'temperature', 'my_measurement', 'Prague'] ... ]
- invoke_script_data_frame(script_id: str, params: Optional[dict] = None, data_frame_index: Optional[List[str]] = None)[source]
Invoke synchronously a script and return Pandas DataFrame.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
scriptreturns tables with differing schemas than the client generates aDataFramefor each of them.- Parameters:
- Returns:
DataFrameorList[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() functionwhich align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- invoke_script_data_frame_stream(script_id: str, params: Optional[dict] = None, data_frame_index: Optional[List[str]] = None)[source]
Invoke synchronously a script and return stream of Pandas DataFrame as a Generator[‘pd.DataFrame’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
Note
If the
scriptreturns tables with differing schemas than the client generates aDataFramefor each of them.- Parameters:
- Returns:
Generator[DataFrame]
Warning
For the optimal processing of the query results use the
pivot() functionwhich align results as a table.from(bucket:"my-bucket") |> range(start: -5m, stop: now()) |> filter(fn: (r) => r._measurement == "mem") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- invoke_script_raw(script_id: str, params: Optional[dict] = None) Iterator[List[str]][source]
Invoke synchronously a script and return result as raw unprocessed result as a str.
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
Result as a str.
- invoke_script_stream(script_id: str, params: Optional[dict] = None) Generator[FluxRecord, Any, None][source]
Invoke synchronously a script and return result as a Generator[‘FluxRecord’].
The bind parameters referenced in the script are substitutes with params key-values sent in the request body.
- Parameters:
script_id (str) – The ID of the script to invoke. (required)
params – bind parameters
- Returns:
Stream of FluxRecord.
- Return type:
Generator[‘FluxRecord’]
- class influxdb_client.domain.Script(id=None, name=None, description=None, org_id=None, script=None, language=None, url=None, created_at=None, updated_at=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
Script - a model defined in OpenAPI.
- property created_at
Get the created_at of this Script.
- Returns:
The created_at of this Script.
- Return type:
datetime
- property description
Get the description of this Script.
- Returns:
The description of this Script.
- Return type:
- property language
Get the language of this Script.
- Returns:
The language of this Script.
- Return type:
ScriptLanguage
- property org_id
Get the org_id of this Script.
- Returns:
The org_id of this Script.
- Return type:
- property script
Get the script of this Script.
The script to execute.
- Returns:
The script of this Script.
- Return type:
- property updated_at
Get the updated_at of this Script.
- Returns:
The updated_at of this Script.
- Return type:
datetime
- class influxdb_client.domain.ScriptCreateRequest(name=None, description=None, script=None, language=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
ScriptCreateRequest - a model defined in OpenAPI.
- property description
Get the description of this ScriptCreateRequest.
Script description. A description of the script.
- Returns:
The description of this ScriptCreateRequest.
- Return type:
- property language
Get the language of this ScriptCreateRequest.
- Returns:
The language of this ScriptCreateRequest.
- Return type:
ScriptLanguage
- property name
Get the name of this ScriptCreateRequest.
Script name. The name must be unique within the organization.
- Returns:
The name of this ScriptCreateRequest.
- Return type:
- property script
Get the script of this ScriptCreateRequest.
The script to execute.
- Returns:
The script of this ScriptCreateRequest.
- Return type:
DeleteApi
- class influxdb_client.DeleteApi(influxdb_client)[source]
Implementation for ‘/api/v2/delete’ endpoint.
Initialize defaults.
- delete(start: Union[str, datetime], stop: Union[str, datetime], predicate: str, bucket: str, org: Optional[Union[str, Organization]] = None) None[source]
Delete Time series data from InfluxDB.
- Parameters:
start (str, datetime.datetime) – start time
stop (str, datetime.datetime) – stop time
predicate (str) – predicate
bucket (str) – bucket id or name from which data will be deleted
org (str, Organization) – specifies the organization to delete data from. Take the
ID,NameorOrganization. If not specified the default value fromInfluxDBClient.orgis used.
- Returns:
- class influxdb_client.domain.DeletePredicateRequest(start=None, stop=None, predicate=None)[source]
NOTE: This class is auto generated by OpenAPI Generator.
Ref: https://openapi-generator.tech
Do not edit the class manually.
DeletePredicateRequest - a model defined in OpenAPI.
- property predicate
Get the predicate of this DeletePredicateRequest.
An expression in [delete predicate syntax](https://docs.influxdata.com/influxdb/latest/reference/syntax/delete-predicate/).
- Returns:
The predicate of this DeletePredicateRequest.
- Return type:
- property start
Get the start of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)). The earliest time to delete from.
- Returns:
The start of this DeletePredicateRequest.
- Return type:
datetime
- property stop
Get the stop of this DeletePredicateRequest.
A timestamp ([RFC3339 date/time format](https://docs.influxdata.com/influxdb/latest/reference/glossary/#rfc3339-timestamp)). The latest time to delete from.
- Returns:
The stop of this DeletePredicateRequest.
- Return type:
datetime
Helpers
- class influxdb_client.client.util.date_utils.DateHelper(timezone: tzinfo = datetime.timezone.utc)[source]
DateHelper to groups different implementations of date operations.
If you would like to serialize the query results to custom timezone, you can use following code:
from influxdb_client.client.util import date_utils from influxdb_client.client.util.date_utils import DateHelper import dateutil.parser from dateutil import tz def parse_date(date_string: str): return dateutil.parser.parse(date_string).astimezone(tz.gettz('ETC/GMT+2')) date_utils.date_helper = DateHelper() date_utils.date_helper.parse_date = parse_date
Initialize defaults.
- Parameters:
timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”.
- parse_date(date_string: str)[source]
Parse string into Date or Timestamp.
- Returns:
Returns a
datetime.datetimeobject or compliant implementation likeclass 'pandas._libs.tslibs.timestamps.Timestamp
- to_nanoseconds(delta)[source]
Get number of nanoseconds in timedelta.
Solution comes from v1 client. Thx. https://github.com/influxdata/influxdb-python/pull/811
- class influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper(timezone: tzinfo = datetime.timezone.utc)[source]
DateHelper that use Pandas library with nanosecond precision.
Initialize defaults.
- Parameters:
timezone – Default timezone used for serialization “datetime” without “tzinfo”. Default value is “UTC”.
- class influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter(**kwargs)[source]
The Helper class to write data into InfluxDB in independent OS process.
- Example:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) writer.start() for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") writer.__del__() if __name__ == '__main__': main()
- How to use with context_manager:
from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
- How to handle batch events:
from influxdb_client import WriteOptions from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") def main(): callback = BatchingCallback() with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main()
Initialize defaults.
For more information how to initialize the writer see the examples above.
- Parameters:
kwargs – arguments are passed into
__init__function ofInfluxDBClientandwrite_api.