Source code for influxdb_client.client.util.multiprocessing_helper

"""
Helpers classes to make easier use the client in multiprocessing environment.

For more information how the multiprocessing works see Python's
`reference docs <https://docs.python.org/3/library/multiprocessing.html>`_.
"""
import logging
import multiprocessing

from influxdb_client import InfluxDBClient, WriteOptions
from influxdb_client.client.exceptions import InfluxDBError

logger = logging.getLogger(__name__)


def _success_callback(conf: (str, str, str), data: str):
    """Successfully writen batch."""
    logger.debug(f"Written batch: {conf}, data: {data}")


def _error_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
    """Unsuccessfully writen batch."""
    logger.debug(f"Cannot write batch: {conf}, data: {data} due: {exception}")


def _retry_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
    """Retryable error."""
    logger.debug(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")


class _PoisonPill:
    """To notify process to terminate."""

    pass


[docs]class MultiprocessingWriter(multiprocessing.Process): """ The Helper class to write data into InfluxDB in independent OS process. Example: .. code-block:: python from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) writer.start() for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") writer.__del__() if __name__ == '__main__': main() How to use with context_manager: .. code-block:: python from influxdb_client import WriteOptions from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter def main(): with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", write_options=WriteOptions(batch_size=100)) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main() How to handle batch events: .. code-block:: python from influxdb_client import WriteOptions from influxdb_client.client.exceptions import InfluxDBError from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter class BatchingCallback(object): def success(self, conf: (str, str, str), data: str): print(f"Written batch: {conf}, data: {data}") def error(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") def main(): callback = BatchingCallback() with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org", success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry) as writer: for x in range(1, 1000): writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}") if __name__ == '__main__': main() """ __started__ = False __disposed__ = False def __init__(self, **kwargs) -> None: """ Initialize defaults. For more information how to initialize the writer see the examples above. :param kwargs: arguments are passed into ``__init__`` function of ``InfluxDBClient`` and ``write_api``. """ multiprocessing.Process.__init__(self) self.kwargs = kwargs self.client = None self.write_api = None self.queue_ = multiprocessing.Manager().Queue()
[docs] def write(self, **kwargs) -> None: """ Append time-series data into underlying queue. For more information how to pass arguments see the examples above. :param kwargs: arguments are passed into ``write`` function of ``WriteApi`` :return: None """ assert self.__disposed__ is False, 'Cannot write data: the writer is closed.' assert self.__started__ is True, 'Cannot write data: the writer is not started.' self.queue_.put(kwargs)
[docs] def run(self): """Initialize ``InfluxDBClient`` and waits for data to writes into InfluxDB.""" # Initialize Client and Write API self.client = InfluxDBClient(**self.kwargs) self.write_api = self.client.write_api(write_options=self.kwargs.get('write_options', WriteOptions()), success_callback=self.kwargs.get('success_callback', _success_callback), error_callback=self.kwargs.get('error_callback', _error_callback), retry_callback=self.kwargs.get('retry_callback', _retry_callback)) # Infinite loop - until poison pill while True: next_record = self.queue_.get() if type(next_record) is _PoisonPill: # Poison pill means break the loop self.terminate() self.queue_.task_done() break self.write_api.write(**next_record) self.queue_.task_done()
[docs] def start(self) -> None: """Start independent process for writing data into InfluxDB.""" super().start() self.__started__ = True
[docs] def terminate(self) -> None: """ Cleanup resources in independent process. This function **cannot be used** to terminate the ``MultiprocessingWriter``. If you want to finish your writes please call: ``__del__``. """ if self.write_api: logger.info("flushing data...") self.write_api.__del__() self.write_api = None if self.client: self.client.__del__() self.client = None logger.info("closed")
def __enter__(self): """Enter the runtime context related to this object.""" self.start() return self def __exit__(self, exc_type, exc_value, traceback): """Exit the runtime context related to this object.""" self.__del__() def __del__(self): """Dispose the client and write_api.""" if self.__started__: self.queue_.put(_PoisonPill()) self.queue_.join() self.join() self.queue_ = None self.__started__ = False self.__disposed__ = True