Source code for influxdb_client.client.influxdb_client

from __future__ import absolute_import

import configparser
import os

from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
from influxdb_client.client.delete_api import DeleteApi
from influxdb_client.client.labels_api import LabelsApi
from influxdb_client.client.organizations_api import OrganizationsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.tasks_api import TasksApi
from influxdb_client.client.users_api import UsersApi
from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings


[docs]class InfluxDBClient(object): def __init__(self, url, token, debug=None, timeout=10000, enable_gzip=False, org: str = None, default_tags: dict = None) -> None: """ :class:`influxdb_client.InfluxDBClient` is client for HTTP API defined in https://github.com/influxdata/influxdb/blob/master/http/swagger.yml. :param url: InfluxDB server API url (ex. http://localhost:9999). :param token: auth token :param debug: enable verbose logging of http requests :param timeout: default http client timeout :param enable_gzip: Enable Gzip compression for http requests. Currently only the "Write" and "Query" endpoints supports the Gzip compression. :param org: organization name (used as a default in query and write API) """ self.url = url self.token = token self.timeout = timeout self.org = org self.default_tags = default_tags conf = _Configuration() conf.host = self.url conf.enable_gzip = enable_gzip conf.debug = debug auth_token = self.token auth_header_name = "Authorization" auth_header_value = "Token " + auth_token self.api_client = ApiClient(configuration=conf, header_name=auth_header_name, header_value=auth_header_value) @classmethod def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False): config = configparser.ConfigParser() config.read(config_file) url = config['influx2']['url'] token = config['influx2']['token'] timeout = None if config.has_option('influx2', 'timeout'): timeout = config['influx2']['timeout'] org = None if config.has_option('influx2', 'org'): org = config['influx2']['org'] default_tags = None if config.has_section('tags'): default_tags = dict(config.items('tags')) if timeout: return cls(url, token, debug=debug, timeout=int(timeout), org=org, default_tags=default_tags, enable_gzip=enable_gzip) return cls(url, token, debug=debug, org=org, default_tags=default_tags, enable_gzip=enable_gzip) @classmethod def from_env_properties(cls, debug=None, enable_gzip=False): url = os.getenv('INFLUXDB_V2_URL', "http://localhost:9999") token = os.getenv('INFLUXDB_V2_TOKEN', "my-token") timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000") org = os.getenv('INFLUXDB_V2_ORG', "my-org") default_tags = dict() for key, value in os.environ.items(): if key.startswith("INFLUXDB_V2_TAG_"): default_tags[key[16:].lower()] = value return cls(url, token, debug=debug, timeout=int(timeout), org=org, default_tags=default_tags, enable_gzip=enable_gzip)
[docs] def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi: """ Creates a Write API instance :param point_settings: :param write_options: write api configuration :return: write api instance """ return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)
[docs] def query_api(self) -> QueryApi: """ Creates a Query API instance :return: Query api instance """ return QueryApi(self)
[docs] def close(self): """ Shutdowns the client """ self.__del__()
def __del__(self): if self.api_client: self.api_client.__del__() self.api_client = None
[docs] def buckets_api(self) -> BucketsApi: """ Creates the Bucket API instance. :return: buckets api """ return BucketsApi(self)
[docs] def authorizations_api(self) -> AuthorizationsApi: """ Creates the Authorizations API instance. :return: authorizations api """ return AuthorizationsApi(self)
[docs] def users_api(self) -> UsersApi: """ Creates the Users API instance. :return: users api """ return UsersApi(self)
[docs] def organizations_api(self) -> OrganizationsApi: """ Creates the Organizations API instance. :return: organizations api """ return OrganizationsApi(self)
[docs] def tasks_api(self) -> TasksApi: """ Creates the Tasks API instance. :return: tasks api """ return TasksApi(self)
[docs] def labels_api(self) -> LabelsApi: """ Creates the Labels API instance. :return: labels api """ return LabelsApi(self)
[docs] def health(self) -> HealthCheck: """ Get the health of an instance. :return: HealthCheck """ health_service = HealthService(self.api_client) try: health = health_service.get_health() return health except Exception as e: print(e) return HealthCheck(name="influxdb", message=str(e), status="fail")
[docs] def ready(self) -> Ready: """ Gets The readiness of the InfluxDB 2.0. :return: Ready """ ready_service = ReadyService(self.api_client) return ready_service.get_ready()
[docs] def delete_api(self) -> DeleteApi: """ Gets the delete metrics API instance :return: delete api """ return DeleteApi(self)
class _Configuration(Configuration): def __init__(self): Configuration.__init__(self) self.enable_gzip = False def update_request_header_params(self, path: str, params: dict): super().update_request_header_params(path, params) if self.enable_gzip: # GZIP Request if path == '/api/v2/write': params["Content-Encoding"] = "gzip" params["Accept-Encoding"] = "identity" pass # GZIP Response if path == '/api/v2/query': # params["Content-Encoding"] = "gzip" params["Accept-Encoding"] = "gzip" pass pass pass def update_request_body(self, path: str, body): _body = super().update_request_body(path, body) if self.enable_gzip: # GZIP Request if path == '/api/v2/write': import gzip if isinstance(_body, bytes): return gzip.compress(data=_body) else: return gzip.compress(bytes(_body, "utf-8")) return _body