SignalFx Developers Guide

Using SignalFlow

SignalFlow, the SignalFx statistical computation engine, provides the following features in support of SignalFx monitoring:

SignalFlow programming language

Object-oriented language with a syntax similar to that of Python.

SignalFlow library

Functions and methods that you call from a SignalFlow program SignalFlow background computation engine: Runs SignalFlow programs in the background and streams results to monitoring objects

Using the API, you can also run SignalFlow programs in the background and stream the results back to your client.

Introduction

The web UI for your company’s existing SignalFx instance provides a good introduction to SignalFlow. When you run Chart Builder for existing charts, you can see the SignalFlow program that provides the statistics the chart displays. These programs are the same as what you specify to run an analysis in the SignalFlow API.

SignalFlow programming language

The SignalFlow programming language specifies operations you want SignalFlow to do on data coming into SignalFx. The language is object-oriented and has a syntax that’s similar to Python.

The following program demonstrates the features of the language:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#Selects app servers that aren't in the qa environment
app_servers = filter('serverType', 'app') and not filter('env', 'qa')

#For the selected app servers, sums cache hits and misses by datacenter
cache_hits = data('cache.hits', filter=app_servers).sum(by='datacenter')
cache_misses = data('cache.misses', filter=app_servers).sum(by='datacenter')
total = cache_hits + cache_misses

#Calculates and publishes 1-hour moving averages of hits and misses as percent of the
#total hits and misses
((cache_hits / total) * 100).mean(over='1h').publish('hit_pct')
((cache_misses/ total) * 100).mean(over='1h').publish('miss_pct')

This example shows you these SignalFlow programming features:

  • Built-in functions like data() that provide real-time incoming data known as stream objects.

  • Positional and keyword arguments for functions. For example, on line 2 the program uses filter() with positional arguments to select server types. On lines 5 and 6 it also uses filter() with keyword arguments to filter by the previously selected server types.

  • Functions for stream objects that control, compute and modify the data.

  • Support for comments delimited from program code by the # character. All characters from # to the end of a line are ignored.

Computation behavior

SignalFlow computations are running instances of a streaming calculation defined in a SignalFlow program. Computations have start times and stop times, and operate at a specific resolution. If you don’t provide a start time, SignalFlow defaults to "now". If you don’t provide a stop time, SignalFlow defaults to "infinity", and the computation continues forever. The resolution of a computation specifies the time interval at which SignalFlow processes data and generates output from the computation. At the completion of each interval, each published stream in the computation emits results.

If you choose a start time that tells SignalFlow to include historical data, you receive output as quickly as the SignalFlow analytics engine can calculate it. Computations that use a start time of "now" send out data in real time at intervals controlled by the resolution of the computation.

Resolution of computations

The resolution of a computation is normally set by SignalFlow to match the incoming frequency of the data. You can ask for a minimum resolution, which forces the computation to operate at a coarser resolution than the one that SignalFlow assigns. You can’t force SignalFlow to make computations at a resolution that’s finer than the incoming frequency of the data, to protect against invalid results. For example, if you calculate a sum at twice the incoming data resolution, the result adds each datapoint to the result twice.

Regardless of the resolution you choose, SignalFlow rolls data to the resolution of the computation using an algorithm determined by the metric type of the input data. You can override this by of the input data. This can be overridden with the rollup parameter of the <code class="doc-code">data()</code> built-in function.

Input lag (MaxDelay)

Because SignalFx is a real-time streaming analytics system, timeliness of its output is a crucial feature. The system can’t wait for all the input data for a computation to arrive, because several conditions may prevent it from ever arriving (for example, the source has been decommissioned). Data may also arrive after a significant delay.

To account for these delays, the analytics system continually measures, tracks, and evaluates data input lag. From the results of the evaluation, the system establishes a deadline after which the computation moves on even even if input data is missing. The deadline lets the computation continue for all the available data, while ensuring that it keeps the perceived lag in results to a minimum.

If you expect your data to arrive with an unpredictable variance in lag, you can override the system-defined "maximum delay" to ensure that all your data is included in the computation.

The topic Working with Metrics Metadata describes input lag in more detail.

Input streams

SignalFlow computations operate on a stream of data coming from the programs and systems you’re monitoring. In SignalFlow, you specify streams as queries that return data, usually from more than one metric time series.

Remember that SignalFx creates a metric time series for each combination of metric and dimension-value pair. For example, the metric cpu.utilization and dimension hostname=serve1 define a unique metric time series.

Because the query for a stream matches multiple metric time series, the stream itself consists of one or more 1-by-n arrays of datapoints, one datapoint for each metric time series and one array for each unit of time.

The following examples show you how specify and publish streams:

1
2
3
4
5
6
7
#Specify a stream
data('cpu.utilization')
data('cpu.*')

#Make the output visible
data('cpu.utilization').publish()
data('cpu.*').publish()

By default, a stream doesn’t output datapoints from a computation. To make its output visible, you have to publish the output using the publish() method.

The data points in a stream contain both the actual metric value and metadata that contains contextual information about the value. You can use this metadata to filter data points or examine the data with SignalFlow functions.

Filters

Filters specify the metric time series to include in a stream, based on dimensions or properties of the metric time series. To create a filter, use the built-in filter() function. The first argument to this function is the dimension or property name. The rest of the arguments are one or more values that form the search criteria for the filter. If you specify multiple values, SignalFlow joins them with an OR condition.

You can specify as many query arguments as you want. For example:

1
2
3
4
5
6
7
8
9
#Filter on metric timeseries that have a serverType of API
filter('serverType', 'API')

#Filter on metric timeseries that have an aws_tag_Name that starts with api
filter('aws_tag_Name', 'api*')

#Filter on metric timeseries with an api of either 'login' or 'logout'
#Notice that SignalFlow joins 'login' and 'logout' with an OR
filter('api','login','logout')

You can explicitly combine filter function calls into expressions using the and, or and not keywords. Operator precedence is the same as Boolean expressions. You can also use parentheses to force precedence.

For example:

1
2
3
4
5
6
7
8
#Simple AND expression that selects data with serverType of API and environment of QA
filter('serverType', 'API') and filter('env', 'QA')

#A more complex example. This filter expression selects data that matches all of the following criteria:
#1. serverType is db.core or any serverType that starts with db.staging AND serverType is *not* db.staging-qa
#2. api is login
(filter('serverType', 'db.core', 'db.staging*') and
 not filter('serverType', 'db.staging-qa')) and filter('api', 'login')

Stream labels

You can label the output of a stream using the label argument of the publish() stream method. Although you don’t have to supply a stream label, using one helps you distinguish between streams when you visualize them.

1
2
3
4
5
#Publish stream output without a label
data('cache.hits').sum(by='datacenter').mean(over='1h').percentile(90).publish()

#Publish stream output with the label 'misses'
data('cache.misses').sum(by='datacenter').mean(over='1h').percentile(90).publish('misses')

To learn more about a specific function, refer to its reference documentation. To find the reference documentation for a function, see the topic SignalFlow Functions and Methods Index.

SignalFlow has a large library of built-in analytical functions that take a stream as an input, perform computations on its data points, and output a modified stream that is the result of the computation. The library has two classes of computational functions:

Aggregations

Apply a calculation across all of the data in the 1-byn array of datapoints for the incoming stream at time t. For example, you can calculate the average (mean) cpu utilization for a set of servers at a point in time by using the mean() stream method without arguments.

Transformations

Apply a calculation on a metric time series within a window of time. For example, you can calculate the moving average of cpu utilization over a specific time window by using the mean() stream method and providing it with the over argument to specify a time range.

SignalFlow functions and methods

See SignalFlow Functions and Methods Index for an alphabetical list of the SignalFlow functions and methods.

Aggregations

Aggregation methods include:

Aggregation examples

The following code demonstrates how to use aggregation functions:

1
2
3
4
5
6
7
8
#Overall CPU utilization average
data('cpu.utilization').mean().publish()

#95th percentile CPU utilization by AWS availability zone
data('cpu.utilization').percentile(95, by='aws_availability_zone').publish()

#CPU utilization average by service and instance type
data('cpu.utilization').mean(by=['service', 'aws_instance_type']).publish()

Aggregation results

If you don’t specify arguments for an aggregation function, the output for the 1-by-n array of datapoints at time t is a single value. For example, if you apply sum() to an input stream that has 5 time series having the values of 5, 10, 15, 20, and 10 at time t, the output is a single value of 60.

Grouping

Grouping the output from an aggregation function organizes the results into separate sets based on metadata values for the datapoints in the stream. To specify metadata on which to group, use the by keyword argument. You specify a single metadata with a string value, for example by='datacenter'. To specify multiple grouping fields, use a string array, for example by=['env', 'datacenter'].

For example, if the datapoints in the input stream have the dimension datacenter, and each datapoint has one of three different values datacenter1, datacenter2, or datacenter3, then calling data('cpu.utilization').mean(by='datacenter').publish() produces 3 values, each in a separate group. Each value represents the mean cpu utilization for data series that have the corresponding dimension value; that is, one value is mean cpu utilization for datacenter1, another value is mean cpu utilization for datacenter2, and the third value is mean cpu utilization for datacenter3.

Transformations

Transformation functions perform a computation on time series over a window of time. A common example of a transformation is a moving average, which is the mean over increments of a particular time period.

To specify the time period for a transformation, use the over keyword argument in the function call. SignalFlow calculates the transformation over the time period you specify. At the end of the period, SignalFlow sends the results to the output stream.

Many analytical functions calculate both aggregations and transformations. If you don’t specify the over keyword argument in the function call, SignalFlow assumes that you want an aggregation. For example, mean() is an aggregation and mean(over='1h30m') is a transformation.

Transformation examples

The following SignalFlow programs show you how to use transformations:

1
2
3
4
5
6
#5-minute moving average of the CPU utilization of all servers
data('cpu.utilization').mean(over='5m').publish()

#15-second moving average of the 95th percentile CPU utilization
#grouped by AWS availability zone
data('cpu.utilization').percentile(95,by='aws_availability_zone').mean(over='15s').publish()

Other analytical functions

In addition to aggregations and transformations, SignalFlow has analytical functions that perform other actions on streams. These include timeshift(), which retrieves a data point from a specified time offset in the past, and delta(), which performs actions on the current and previous data point, regardless of the time interval between them, among others.

[[map()-and-lambda-functions]] == map() and lambda functions

SignalFlow can calculate values in the midst of an incoming stream. To access this functionality, you call the map() method or "lambda" functions. In most cases, you use these to perform arithmetic or algebraic calculations on all the values in a stream. SignalFlow only supports single-argument lambdas and only allows references to that argument.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#Calculate the maximum mean cpu.user by datacenter over an hour. Use the floor function to "round" the mean.

#With two variables
A = data('cpu.user').mean(by='datacenter')
floor(A).max(over='1h').publish()

#Use the map function
data('cpu.user').mean(by='datacenter').map(lambda x: floor(x)).max(over='1h').publish()

#Multiply the cpu.utilization by 2
data('cpu.utilization').map(lambda x: x*2).publish()

#Get the floor of cpu.utilization raise to the power of 4
data('cpu.utilization').map(lambda x: floor(pow(x,4))).publish()

In addition to calculations, lambda functions allow if-else expressions, using the syntax <value if condition is true> if <condition> else <value if condition is false> . Using the value None, you can use this type of lambda function to filter values from the output. For example:

1
2
#Filter if x is between 50 and 60
data('cpu.utilization').map(lambda x: x if x > 50 and x < 60 else None).publish()

Using streams as variables

You can use streams as inputs to mathematical expressions, just as you would use a variable in an algebraic formula. To do so, assign a stream name to a variable name at the beginning of your SignalFlow program. The output of a calculation using one or more stream objects is also a stream object, so you can perform aggregations or transforms on it as well. To perform further analytics on a stream calculation (or to stream the results) the calculation must be wrapped in parentheses.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#Inline stream calculation
(data('cache.hits') / data('cache.total') * 100).mean(over='1h').publish()

#Using a stream as a variable
cache_hits = data('cache.hits').sum(by='datacenter')
cache_misses = data('cached.misses').sum(by='datacenter')
hit_pct = (cache_hits / (cache_hits + cache_misses) * 100).mean(over='1h')
hit_pct.publish('hit_pct')

#A stream calculation can be used in other stream calculations
used_pct = (data('cpu.used') / data('cpu.*').sum()) * 100
cpu_per_thread = (used_pct / data('server.threadcount')).publish()

SignalFlow API

The SignalFx’s SignalFlow streaming analytics service provides REST HTTP endpoints and a WebSocket connection.

Whenever possible, you should use WebSocket, because it offers these advantages: * Multiple data streams and computations using a single HTTP connection * Encoding protocol that’s more efficient than the REST HTTP protocol * Overall lower latency.

The HTTP endpoints offer the same functionality, and they’re more straightforward to add to your programs, but they require one HTTP connection per stream.

Regardless of the connection you use, the SignalFlow API lets you to execute SignalFlow programs as background jobs and receive real-time, streaming output. The API returns your results in a stream of messages that contain information about the execution of the computation, metadata for the output timeseries, and the generated data and events.

For more information on the format of those messages, see the SignalFlow Stream Messages Reference.

Authentication

To use the SignalFlow API, you must authenticate with SignalFx. The steps differ according to the type of connection you use:

  • WebSocket connection: The initial connection doesn’t require authentication:

    1. Connect using the operation GET https://stream.{REALM}.signalfx.com/v2/signalflow/connect.

    2. Within 5 seconds of connectin, send a WebSocket JSON message containing a session access token (referred to in the web UI as a User API Access Token):

{
  'type': 'authenticate',
  'token': 'TOKEN'
}

To learn more about authentication tokens, see Authentication Tokens.

Client library support

To simplify the use of the SignalFlow API, SignalFx provides client libraries for several languages. Click a name to navigate to the library repository in GitHub:

These libraries provide high-level access to SignalFlow API streaming and execution.

Executing background computations

If you’re authenticated, you can run SignalFlow computations from a SignalFlow program. Running a computation starts a job on SignalFx servers that does the computation and streams the results back to you in a channel.

When you use the REST HTTP API, this channel is the HTTP connection itself. When you use the WebSocket connection, multiple channel outputs go to your single WebSocket connection. The channel name identifies the computation that send the messages and data.

To stop receiving output from a computation, detach from the channel:

REST HTTP

Close the connection.

WebSocket

Send a detach request (see [[SignalFlow Connect API]] for the WebSocket connection protocol details).

While WebSocket remains connected, SignalFx keeps your computation alive. If you disconnect using an end_of_channel message, SignalFx stops the computation. In addition, if SignalFx detects an error associated with the computation or connection, it sends you an abort_channel message and closes the computation.

Examples

This section contains examples that show you how to use the SignalFlow API.

Command-line example

The following curl command demonstrates how to start a computation that calculates the average CPU utilization over all servers in your infrastructure and returns the result in real-time.

1
2
3
4
5
6
$ curl \
    --request POST \
    --header "Content-Type: text/plain" \
	--header "X-SF-Token: YOUR_SFX_TOKEN" \
    --data "data('cpu.utilization').mean().publish()" \
    "https://stream.signalfx.com/v2/signalflow/execute"

The result is streaming HTTP responses containing Server-Sent Events (SSE). For more information, see the

endpoint documentation and the topic SignalFlow Stream Messages Reference.

Python client library example

The Python client library for the SignalFlow API is available in pip. You can also view the library code in the GitHub signalfx-python client library repo. The following code shows you how to write the previous example using the Python library:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import signalfx

signalflow_program = "data('cpu.utilization').mean().publish('cpu')"
signalflow_client = signalfx.SignalFx().signalflow('MY_SFX_TOKEN')
try:
    print('Executing {0} ...'.format(program))
    computation = signalflow_client.execute(program)
    for msg in computation.stream():
        if isinstance(msg, signalfx.signalflow.messages.DataMessage):
            print('{0}: {1}'.format(msg.logical_timestamp_ms, msg.data))
        if isinstance(msg, signalfx.signalflow.messages.EventMessage):
            print('{0}: {1}'.format(msg.timestamp_ms, msg.properties))
finally:
    signalflow_client.close()

To learn more about the Python client library, see the signalfx-python GitHub repository.

Node.js client library example

The node.js client library for the SignalFlow API is available in npm. You can also view the library code in the GitHub signalfx-nodejs client library repo.

The following code shows you how to write the previous examples using the node.js library: the same example as above using the signalfx-nodejs library:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var signalfx = require('signalfx')

var flow = signalfx.SignalFlow('MY_SFX_TOKEN');

var computation = flow.execute({
    program: "data('cpu.utilization').publish('cpu')",
    start: Date.now() - 60000,
    stop: Date.now() + 600000,
    resolution: 10000,
    maxDelay: 1000
  });

computation.stream(function (err, data) {
  if(err) {
    // handle error
  } else {
    console.log(data);
  }
});

© Copyright 2019 SignalFx.

Third-party license information