Source code for influxdb_client.client.influxdb_client

"""InfluxDBClient is client for API defined in https://github.com/influxdata/influxdb/blob/master/http/swagger.yml."""

from __future__ import absolute_import

import configparser
import logging
import os
import base64
import warnings

from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService, PingService
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
from influxdb_client.client.delete_api import DeleteApi
from influxdb_client.client.labels_api import LabelsApi
from influxdb_client.client.organizations_api import OrganizationsApi
from influxdb_client.client.query_api import QueryApi, QueryOptions
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings


logger = logging.getLogger(__name__)


[docs]class InfluxDBClient(object): """InfluxDBClient is client for InfluxDB v2.""" def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None, default_tags: dict = None, **kwargs) -> None: """ Initialize defaults. :param url: InfluxDB server API url (ex. http://localhost:8086). :param token: auth token :param debug: enable verbose logging of http requests :param timeout: HTTP client timeout setting for a request specified in milliseconds. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param enable_gzip: Enable Gzip compression for http requests. Currently only the "Write" and "Query" endpoints supports the Gzip compression. :param org: organization name (used as a default in query and write API) :key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server. :key str ssl_ca_cert: Set this to customize the certificate file to verify the peer. :key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128) :key str proxy_headers: A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication. :key int connection_pool_maxsize: Number of connections to save that can be reused by urllib3. Defaults to "multiprocessing.cpu_count() * 5". :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests except batching writes. As a default there is no one retry strategy. :key bool auth_basic: Set this to true to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication. (defaults to false, don't set to true when talking to InfluxDB 2) :key list[str] profilers: list of enabled Flux profilers """ self.url = url self.token = token self.org = org self.default_tags = default_tags conf = _Configuration() if self.url.endswith("/"): conf.host = self.url[:-1] else: conf.host = self.url conf.enable_gzip = enable_gzip conf.debug = debug conf.verify_ssl = kwargs.get('verify_ssl', True) conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None) conf.proxy = kwargs.get('proxy', None) conf.proxy_headers = kwargs.get('proxy_headers', None) conf.connection_pool_maxsize = kwargs.get('connection_pool_maxsize', conf.connection_pool_maxsize) conf.timeout = timeout auth_token = self.token auth_header_name = "Authorization" auth_header_value = "Token " + auth_token auth_basic = kwargs.get('auth_basic', False) if auth_basic: auth_header_value = "Basic " + base64.b64encode(token.encode()).decode() retries = kwargs.get('retries', False) self.profilers = kwargs.get('profilers', None) self.api_client = ApiClient(configuration=conf, header_name=auth_header_name, header_value=auth_header_value, retries=retries) def __enter__(self): """ Enter the runtime context related to this object. It will bind this method’s return value to the target(s) specified in the `as` clause of the statement. return: self instance """ return self def __exit__(self, exc_type, exc_value, traceback): """Exit the runtime context related to this object and close the client.""" self.close()
[docs] @classmethod def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False): """ Configure client via configuration file. The configuration has to be under 'influx' section. The supported formats: - https://docs.python.org/3/library/configparser.html - https://toml.io/en/ Configuration options: - url - org - token - timeout, - verify_ssl - ssl_ca_cert - connection_pool_maxsize - auth_basic - profilers - proxy config.ini example:: [influx2] url=http://localhost:8086 org=my-org token=my-token timeout=6000 connection_pool_maxsize=25 auth_basic=false profilers=query,operator proxy=http:proxy.domain.org:8080 [tags] id = 132-987-655 customer = California Miner data_center = ${env.data_center} config.toml example:: [influx2] url = "http://localhost:8086" token = "my-token" org = "my-org" timeout = 6000 connection_pool_maxsize = 25 auth_basic = false profilers="query, operator" proxy = "http://proxy.domain.org:8080" [tags] id = "132-987-655" customer = "California Miner" data_center = "${env.data_center}" """ config = configparser.ConfigParser() config.read(config_file) def config_value(key: str): return config['influx2'][key].strip('"') url = config_value('url') token = config_value('token') timeout = None if config.has_option('influx2', 'timeout'): timeout = config_value('timeout') org = None if config.has_option('influx2', 'org'): org = config_value('org') verify_ssl = True if config.has_option('influx2', 'verify_ssl'): verify_ssl = config_value('verify_ssl') ssl_ca_cert = None if config.has_option('influx2', 'ssl_ca_cert'): ssl_ca_cert = config_value('ssl_ca_cert') connection_pool_maxsize = None if config.has_option('influx2', 'connection_pool_maxsize'): connection_pool_maxsize = config_value('connection_pool_maxsize') auth_basic = False if config.has_option('influx2', 'auth_basic'): auth_basic = config_value('auth_basic') default_tags = None if config.has_section('tags'): tags = {k: v.strip('"') for k, v in config.items('tags')} default_tags = dict(tags) profilers = None if config.has_option('influx2', 'profilers'): profilers = [x.strip() for x in config_value('profilers').split(',')] proxy = None if config.has_option('influx2', 'proxy'): proxy = config_value('proxy') return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags, enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert, connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic), profilers=profilers, proxy=proxy)
[docs] @classmethod def from_env_properties(cls, debug=None, enable_gzip=False): """ Configure client via environment properties. Supported environment properties: - INFLUXDB_V2_URL - INFLUXDB_V2_ORG - INFLUXDB_V2_TOKEN - INFLUXDB_V2_TIMEOUT - INFLUXDB_V2_VERIFY_SSL - INFLUXDB_V2_SSL_CA_CERT - INFLUXDB_V2_CONNECTION_POOL_MAXSIZE - INFLUXDB_V2_AUTH_BASIC """ url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086") token = os.getenv('INFLUXDB_V2_TOKEN', "my-token") timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000") org = os.getenv('INFLUXDB_V2_ORG', "my-org") verify_ssl = os.getenv('INFLUXDB_V2_VERIFY_SSL', "True") ssl_ca_cert = os.getenv('INFLUXDB_V2_SSL_CA_CERT', None) connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None) auth_basic = os.getenv('INFLUXDB_V2_AUTH_BASIC', "False") prof = os.getenv("INFLUXDB_V2_PROFILERS", None) profilers = None if prof is not None: profilers = [x.strip() for x in prof.split(',')] default_tags = dict() for key, value in os.environ.items(): if key.startswith("INFLUXDB_V2_TAG_"): default_tags[key[16:].lower()] = value return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags, enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert, connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic), profilers=profilers)
[docs] def write_api(self, write_options=WriteOptions(), point_settings=PointSettings(), **kwargs) -> WriteApi: """ Create a Write API instance. Example: .. code-block:: python from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # Initialize SYNCHRONOUS instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: write_api = client.write_api(write_options=SYNCHRONOUS) If you would like to use a **background batching**, you have to configure client like this: .. code-block:: python from influxdb_client import InfluxDBClient # Initialize background batching instance of WriteApi with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: with client.write_api() as write_api: pass There is also possibility to use callbacks to notify about state of background batches: .. code-block:: python from influxdb_client import InfluxDBClient from influxdb_client.client.exceptions import InfluxDBError class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client: callback = BatchingCallback() with client.write_api(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as write_api: pass :param write_options: Write API configuration :param point_settings: settings to store default tags :key success_callback: The callable ``callback`` to run after successfully writen a batch. The callable must accept two arguments: - `Tuple`: ``(bucket, organization, precision)`` - `str`: written data **[batching mode]** :key error_callback: The callable ``callback`` to run after unsuccessfully writen a batch. The callable must accept three arguments: - `Tuple`: ``(bucket, organization, precision)`` - `str`: written data - `Exception`: an occurred error **[batching mode]** :key retry_callback: The callable ``callback`` to run after retryable error occurred. The callable must accept three arguments: - `Tuple`: ``(bucket, organization, precision)`` - `str`: written data - `Exception`: an retryable error **[batching mode]** :return: write api instance """ return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings, **kwargs)
[docs] def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi: """ Create a Query API instance. :param query_options: optional query api configuration :return: Query api instance """ return QueryApi(self, query_options)
[docs] def close(self): """Shutdown the client.""" self.__del__()
def __del__(self): """Shutdown the client.""" if self.api_client: self.api_client.__del__() self.api_client = None
[docs] def buckets_api(self) -> BucketsApi: """ Create the Bucket API instance. :return: buckets api """ return BucketsApi(self)
[docs] def authorizations_api(self) -> AuthorizationsApi: """ Create the Authorizations API instance. :return: authorizations api """ return AuthorizationsApi(self)
[docs] def users_api(self) -> UsersApi: """ Create the Users API instance. :return: users api """ return UsersApi(self)
[docs] def organizations_api(self) -> OrganizationsApi: """ Create the Organizations API instance. :return: organizations api """ return OrganizationsApi(self)
[docs] def tasks_api(self) -> TasksApi: """ Create the Tasks API instance. :return: tasks api """ return TasksApi(self)
[docs] def labels_api(self) -> LabelsApi: """ Create the Labels API instance. :return: labels api """ return LabelsApi(self)
[docs] def health(self) -> HealthCheck: """ Get the health of an instance. :return: HealthCheck """ warnings.warn("This method is deprecated. Call 'ping()' instead.", DeprecationWarning) health_service = HealthService(self.api_client) try: health = health_service.get_health() return health except Exception as e: return HealthCheck(name="influxdb", message=str(e), status="fail")
[docs] def ping(self) -> bool: """ Return the the status of InfluxDB instance. :return: The status of InfluxDB. """ ping_service = PingService(self.api_client) try: ping_service.get_ping() return True except Exception as ex: logger.error("Unexpected error during /ping: %s", ex) return False
[docs] def version(self) -> str: """ Return the version of the connected InfluxDB Server. :return: The version of InfluxDB. """ ping_service = PingService(self.api_client) response = ping_service.get_ping_with_http_info(_return_http_data_only=False) if response is not None and len(response) >= 3: if 'X-Influxdb-Version' in response[2]: return response[2]['X-Influxdb-Version'] return "unknown"
[docs] def ready(self) -> Ready: """ Get The readiness of the InfluxDB 2.0. :return: Ready """ ready_service = ReadyService(self.api_client) return ready_service.get_ready()
[docs] def delete_api(self) -> DeleteApi: """ Get the delete metrics API instance. :return: delete api """ return DeleteApi(self)
class _Configuration(Configuration): def __init__(self): Configuration.__init__(self) self.enable_gzip = False def update_request_header_params(self, path: str, params: dict): super().update_request_header_params(path, params) if self.enable_gzip: # GZIP Request if path == '/api/v2/write': params["Content-Encoding"] = "gzip" params["Accept-Encoding"] = "identity" pass # GZIP Response if path == '/api/v2/query': # params["Content-Encoding"] = "gzip" params["Accept-Encoding"] = "gzip" pass pass pass def update_request_body(self, path: str, body): _body = super().update_request_body(path, body) if self.enable_gzip: # GZIP Request if path == '/api/v2/write': import gzip if isinstance(_body, bytes): return gzip.compress(data=_body) else: return gzip.compress(bytes(_body, "utf-8")) return _body def _to_bool(bool_value): return str(bool_value).lower() in ("yes", "true") def _to_int(int_value): return int(int_value) if int_value is not None else None