Sprockets InfluxDB API

To use the InfluxDB client, you need to install it into the runtime environment. To do so, you use the install() method. Then metrics can be added by creating instances of the Measurement class and then adding them to the write buffer by calling add_measurement(). In the following example, a measurement is added to the example InfluxDB database with the measurement name of measurement-name. When the IOLoop is started, the stop method is invoked which calls shutdown(). shutdown() ensures that all of the buffered metrics are written before the IOLoop is stopped.

Measurements will be sent in batches to InfluxDB when there are INFLUXDB_TRIGGER_SIZE measurements in the buffer or after INFLUXDB_INTERVAL milliseconds have passed since the last measurement was added, which ever occurs first.

The timeout timer for submitting a buffer of < INFLUXDB_TRIGGER_SIZE measurements is only started when there isn’t an active timer, there is not a batch currently being written, and a measurement is added to the buffer.

import logging

import sprockets_influxdb as influxdb
from tornado import ioloop

logging.basicConfig(level=logging.INFO)

io_loop = ioloop.IOLoop.current()
influxdb.install(io_loop=io_loop)

measurement = influxdb.Measurement('example', 'measurement-name')
measurement.set_tag('foo', 'bar')
measurement.set_field('baz', 1.05)

influxdb.add_measurement(measurement)

def stop():
    influxdb.shutdown()
    io_loop.stop()

io_loop.add_callback(stop)
io_loop.start()

If you are writing a Tornado web application, you can automatically instrument all of your Request Handlers by adding the InfluxDBMixin:

import logging
import os

import sprockets_influxdb as influxdb
from tornado import ioloop, web


class RequestHandler(influxdb.InfluxDBMixin,
                     web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write({'hello': 'world'})


if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO)

    os.environ['ENVIRONMENT'] = 'development'
    os.environ['SERVICE'] = 'example'

    io_loop = ioloop.IOLoop.current()

    application = web.Application([
        (r"/", RequestHandler),
    ], **{influxdb.REQUEST_DATABASE: 'example'})
    application.listen(8888)
    influxdb.install(io_loop=io_loop)
    try:
        io_loop.start()
    except KeyboardInterrupt:
        logging.info('Stopping')
        influxdb.shutdown()
        io_loop.stop()
        logging.info('Stopped')

Core Methods

sprockets_influxdb.install(url=None, auth_username=None, auth_password=None, submission_interval=None, max_batch_size=None, max_clients=10, base_tags=None, max_buffer_size=None, trigger_size=None, sample_probability=1.0)[source]

Call this to install/setup the InfluxDB client collector. All arguments are optional.

Parameters:
  • url (str) – The InfluxDB API URL. If URL is not specified, the INFLUXDB_SCHEME, INFLUXDB_HOST and INFLUXDB_PORT environment variables will be used to construct the base URL. Default: http://localhost:8086/write
  • auth_username (str) – A username to use for InfluxDB authentication. If not specified, the INFLUXDB_USER environment variable will be used. Default: None
  • auth_password (str) – A password to use for InfluxDB authentication. If not specified, the INFLUXDB_PASSWORD environment variable will be used. Default: None
  • submission_interval (int) – The maximum number of milliseconds to wait after the last batch submission before submitting a batch that is smaller than trigger_size. Default: 60000
  • max_batch_size (int) – The number of measurements to be submitted in a single HTTP request. Default: 10000
  • max_clients (int) – The number of simultaneous batch submissions that may be made at any given time. Default: 10
  • base_tags (dict) – Default tags that are to be submitted with each measurement. Default: None
  • max_buffer_size (int) – The maximum number of pending measurements in the buffer before new measurements are discarded. Default: 25000
  • trigger_size (int) – The minimum number of measurements that are in the buffer before a batch can be submitted. Default: 5000
  • sample_probability (float) – Value between 0 and 1.0 specifying the probability that a batch will be submitted (0.25 == 25%)
Returns:

True if the client was installed by this call and False otherwise.

If INFLUXDB_PASSWORD is specified as an environment variable, it will be masked in the Python process.

sprockets_influxdb.add_measurement(measurement)[source]

Add measurement data to the submission buffer for eventual writing to InfluxDB.

Example:

import sprockets_influxdb as influxdb

measurement = influxdb.Measurement('example', 'measurement-name')
measurement.set_tag('foo', 'bar')
measurement.set_field('baz', 1.05)

influxdb.add_measurement(measurement)
:param Measurement measurement: The
measurement to add to the buffer for submission to InfluxDB.
sprockets_influxdb.shutdown()[source]

Invoke on shutdown of your application to stop the periodic callbacks and flush any remaining metrics.

Returns a future that is complete when all pending metrics have been submitted.

Return type:Future

Measurement Class

class sprockets_influxdb.Measurement(database, name)[source]

The Measurement class represents what will become a single row in an InfluxDB database. Measurements are added to InfluxDB via the add_measurement() method.

Example:

import sprockets_influxdb as influxdb

measurement = Measurement('database-name', 'measurement-name')
measurement.set_tag('foo', 'bar')
measurement.set_field('baz', 1.05)

influxdb.add_measurement(measurement)
Parameters:
  • database (str) – The database name to use when submitting
  • name (str) – The measurement name
duration(**kwds)[source]

Record the time it takes to run an arbitrary code block.

Parameters:name (str) – The field name to record the timing in

This method returns a context manager that records the amount of time spent inside of the context, adding the timing to the measurement.

marshall()[source]

Return the measurement in the line protocol format.

Return type:str
set_field(name, value)[source]

Set the value of a field in the measurement.

Parameters:
  • name (str) – The name of the field to set the value for
  • value (int|float|bool|str) – The value of the field
Raises:

ValueError

set_tag(name, value)[source]

Set a tag on the measurement.

Parameters:
  • name (str) – name of the tag to set
  • value (str) – value to assign

This will overwrite the current value assigned to a tag if one exists.

set_tags(tags)[source]

Set multiple tags for the measurement.

Parameters:tags (dict) – Tag key/value pairs to assign

This will overwrite the current value assigned to a tag if one exists with the same name.

set_timestamp(value)[source]

Override the timestamp of a measurement.

Parameters:value (float) – The timestamp to assign to the measurement

Configuration Methods

sprockets_influxdb.set_auth_credentials(username, password)[source]

Override the default authentication credentials obtained from the environment variable configuration.

Parameters:
  • username (str) – The username to use
  • password (str) – The password to use
sprockets_influxdb.set_base_url(url)[source]

Override the default base URL value created from the environment variable configuration.

Parameters:url (str) – The base URL to use when submitting measurements
sprockets_influxdb.set_max_batch_size(limit)[source]

Set a limit to the number of measurements that are submitted in a single batch that is submitted per databases.

Parameters:limit (int) – The maximum number of measurements per batch
sprockets_influxdb.set_max_buffer_size(limit)[source]

Set the maximum number of pending measurements allowed in the buffer before new measurements are discarded.

Parameters:limit (int) – The maximum number of measurements per batch
sprockets_influxdb.set_timeout(milliseconds)[source]

Override the maximum duration to wait for submitting measurements to InfluxDB.

Parameters:milliseconds (int) – Maximum wait in milliseconds
sprockets_influxdb.set_trigger_size(limit)[source]

Set the number of pending measurements that trigger the writing of data to InfluxDB

Parameters:limit (int) – The minimum number of measurements to trigger a batch

Request Handler Mixin

class sprockets_influxdb.InfluxDBMixin(application, request, **kwargs)[source]

Mixin that automatically submits per-request measurements to InfluxDB with the request duration.

The measurements will automatically add the following tags:

  • Request handler
  • Request endpoint (if enabled via a named URL)
  • Request method
  • Request correlation_id (if set)
  • Response status_code

To add additional tags and fields, use the set_field(), set_tag(), set_tags(), and timer() methods of the influxdb attribute of the RequestHandler.

Other

sprockets_influxdb.flush()[source]

Flush all pending measurements to InfluxDB. This will ensure that all measurements that are in the buffer for any database are written. If the requests fail, it will continue to try and submit the metrics until they are successfully written.

Return type:Future