SignalFx Developers Guide

Using SignalFlow

SignalFlow, the SignalFx statistical computation engine, provides the following features in support of SignalFx monitoring:

SignalFlow programming language

Object-oriented language with a syntax similar to that of Python.

SignalFlow library

Functions and methods that you call from a SignalFlow program

SignalFlow background computation engine

Runs SignalFlow programs in the background and streams results to monitoring objects

Using the API, you can also run SignalFlow programs in the background and receive the results asynchronously in your client.

Introduction

The web UI for your company’s existing SignalFx instance contains a good introduction to SignalFlow. When you run Chart Builder for existing charts, you can see the SignalFlow program that provides the data for the chart. This program has the same form as a program that you run in the background using the SignalFlow API.

To learn more about SignalFlow programs in the web UI, see the topic SignalFx Analytics Overview in the product documentation.

SignalFlow programming language

SignalFlow programs ingest streams of metrics from systems you’re monitoring, perform statistical analysis on the data, and send out the results as streams of metrics or data. The SignalFlow language is object-oriented and has a syntax that’s similar to Python.

The following program demonstrates the features of the language:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
app_servers = filter('serverType', 'app') and not filter('env', 'qa')

#For the selected app servers, sums cache hits and misses by datacenter
cache_hits = data('cache.hits', filter=app_servers).sum(by='datacenter')
cache_misses = data('cache.misses', filter=app_servers).sum(by='datacenter')
total = cache_hits + cache_misses

#Calculates and publishes 1-hour moving averages of hits and misses as percent of the
#total hits and misses
((cache_hits / total) * 100).mean(over='1h').publish('hit_pct')
((cache_misses/ total) * 100).mean(over='1h').publish('miss_pct')

This example shows you these SignalFlow programming features:

  • Built-in functions like data() that provide real-time incoming data known as stream objects.

  • Positional and keyword arguments for functions. For example, on line 2 the program uses filter() with positional arguments to select server types. On lines 5 and 6 it also uses filter() with keyword arguments to filter by the previously selected server types.

  • Functions for stream objects that control, compute and modify the data.

  • Support for comments delimited from program code by the # character. All characters from # to the end of a line are ignored.

Computation behavior

SignalFlow computations are running instances of a streaming calculation defined in a SignalFlow program. Computations have start times and stop times, and operate at a specific resolution. If you don’t provide a start time, SignalFlow defaults to the current time. If you don’t provide a stop time, the computation continues indefinitely. The resolution of a computation specifies the time interval at which SignalFlow processes data and generates output from the computation. At the completion of each interval, each published stream in the computation emits results.

If you choose a start time that tells SignalFlow to include historical data, you receive output as quickly as the SignalFlow analytics engine can calculate it. Computations that use a start time of "now" send out data in real time at intervals controlled by the resolution of the computation.

Resolution of computations

The resolution of a computation is normally set by SignalFlow to match the incoming frequency of the data. You can ask for a minimum resolution, which forces the computation to operate at a coarser resolution than the one that SignalFlow assigns. You can’t force SignalFlow to make computations at a resolution that’s finer than the incoming frequency of the data, which protects against invalid results. For example, if you could force SignalFlow to calculate a sum at twice the incoming data resolution, the operation would add each datapoint to the result twice.

Regardless of the resolution you choose, SignalFlow rolls up data to the resolution of the computation using an algorithm determined by the metric type of the input data. You can override this with the rollup parameter of the data() built-in function.

Input lag (MaxDelay)

Because SignalFx is a real-time system, its timeliness is a key feature. The system can’t wait for all the input data for a computation to arrive, because conditions can prevent it from ever arriving. For example, the source of the metric can go offline.

In addition, data may also arrive after a significant delay, depending on the latency of the connection between the source and SignalFx.

To account for these delays, the analytics system continually measures, tracks, and evaluates data input lag. From the results of this evaluation, the system establishes a deadline after which the computation moves on, even if input data is missing. This deadline lets the computation continue for all the available data, while ensuring that it keeps the perceived lag in results to a minimum.

If you expect your data to arrive with an unpredictable variance in lag, you can override the system-defined maximum delay to ensure that all your data is included in the computation.

The topic Working with Metrics Metadata describes input lag in more detail.

Input streams

SignalFlow computations operate on a stream of data coming from the programs and systems you’re monitoring. In SignalFlow, you specify streams as queries that return data, usually from more than one metric time series.

SignalFx creates a metric time series for each combination of metric and dimension-value pair. For example, the metric cpu.utilization and dimension hostname=server1 defines a unique metric time series.

Because the query for a stream usually matches multiple metric time series, the stream itself consists of one or more arrays of datapoints, one datapoint for each metric time series and one array for each unit of time.

The following examples show you how specify and publish streams:

1
2
3
4
5
6
7
#Specify a stream
data('cpu.utilization')
data('cpu.*')

#Make the output visible
data('cpu.utilization').publish()
data('cpu.*').publish()

By default, a stream doesn’t output datapoints from a computation. To make its output visible, you have to publish the output using the publish() method.

The data points in a stream contain both the actual metric value and metadata that contains contextual information about the value. You can use this metadata to filter data points or examine the data with SignalFlow functions.

Filters

Filters specify the metric time series to include in a stream, based on dimensions or properties of the metric time series. To create a filter, use the built-in filter() function. The first argument to this function is the dimension or property name. The rest of the arguments are one or more values that form the search criteria for the filter. If you specify multiple values, SignalFlow joins them with an OR condition.

You can specify as many query arguments as you want. For example:

1
2
3
4
5
6
7
8
9
#Filter on metric timeseries that have a serverType of API
filter('serverType', 'API')

#Filter on metric timeseries that have an aws_tag_Name that starts with api
filter('aws_tag_Name', 'api*')

#Filter on metric timeseries with an api of either 'login' or 'logout'
#Notice that SignalFlow joins 'login' and 'logout' with an OR
filter('api','login','logout')

You can explicitly combine filter function calls into expressions using the AND, OR and NOT keywords. Operator precedence is the same as Boolean expressions. You can also use parentheses to force precedence.

For example:

1
2
3
4
5
6
7
8
#Simple AND expression that selects data with serverType of API and environment of QA
filter('serverType', 'API') and filter('env', 'QA')

#A more complex example. This filter expression selects data that matches all of the following criteria:
#1. serverType is db.core or any serverType that starts with db.staging AND serverType is *not* db.staging-qa
#2. api is login
(filter('serverType', 'db.core', 'db.staging*') and
 not filter('serverType', 'db.staging-qa')) and filter('api', 'login')

Stream labels

You can label the output of a stream using the label argument of the publish() stream method. Although you don’t have to supply a stream label, using one helps you distinguish between streams when you visualize them.

1
2
3
4
5
#Publish stream output without a label
data('cache.hits').sum(by='datacenter').mean(over='1h').percentile(90).publish()

#Publish stream output with the label 'misses'
data('cache.misses').sum(by='datacenter').mean(over='1h').percentile(90).publish('misses')

To learn more about a specific function, refer to its reference documentation. To find the reference documentation for a function, see the topic SignalFlow Functions and Methods Index.

SignalFlow functions and methods

SignalFlow has a large library of built-in analytical functions that take a stream as an input, perform computations on its data points, and output a modified stream that is the result of the computation. The library has two classes of computational functions:

Aggregations

Apply a calculation across all of the data in the array of datapoints for the incoming stream at one point in time. For example, you can calculate the average (mean) cpu utilization for a set of servers at a point in time by using the mean() stream method without arguments.

Transformations

Apply a calculation on a metric time series within a window of time. For example, you can calculate the moving average of cpu utilization over a specific time window by using the mean() stream method and providing it with the over argument to specify a time range.

Calendar window transformations let you specify calculations based on specific dates or times. To learn more about this type of transformation, see

See SignalFlow Functions and Methods Index for an alphabetical list of the SignalFlow functions and methods.

Aggregations

Aggregation methods include:

Aggregation examples

The following code demonstrates how to use aggregation functions:

1
2
3
4
5
6
7
8
#Overall CPU utilization average
data('cpu.utilization').mean().publish()

#95th percentile CPU utilization by AWS availability zone
data('cpu.utilization').percentile(95, by='aws_availability_zone').publish()

#CPU utilization average by service and instance type
data('cpu.utilization').mean(by=['service', 'aws_instance_type']).publish()

Aggregation results

If you don’t specify arguments for an aggregation function, the output for the 1-by-n array of datapoints at time t is a single value. For example, if you apply sum() to an input stream that has 5 time series having the values of 5, 10, 15, 20, and 10 at time t, the output is a single value of 60.

Grouping

Grouping the output from an aggregation function organizes the results into separate sets based on metadata values for the datapoints in the stream. To specify metadata on which to group, use the by keyword argument. You specify a single metadata with a string value, for example by='datacenter'. To specify multiple grouping fields, use a string array, for example by=['env', 'datacenter'].

For example, if the datapoints in the input stream have the dimension datacenter, and each datapoint has one of three different values datacenter1, datacenter2, or datacenter3, then calling data('cpu.utilization').mean(by='datacenter').publish() produces 3 values, each in a separate group. Each value represents the mean cpu utilization for data series that have the corresponding dimension value; that is, one value is mean cpu utilization for datacenter1, another value is mean cpu utilization for datacenter2, and the third value is mean cpu utilization for datacenter3.

Transformations

Rolling window transformations

Some methods have an option to perform a computation on an MTS over a period of time. For example, mean(over='10s') is the moving average of an MTS over 10 seconds.

The over parameter supports units of seconds('s'), minutes('m'), and hours('h').

SignalFlow provides rolling window transformations for the following stream methods:

Calendar window transformations

Some methods also have an option to perform a computation over calendar intervals or windows, such as days, weeks, and months. For example, to calculate a daily average, use
mean(cycle='day'). After every midnight, this calculation returns the mean of all datapoints from the previous day.

Notice that, unlike mean(over='24h'), this is not a moving average.

SignalFlow provides calendar window transformations for the following stream methods:

Specifying calendar window transformations

You control calendar window transformations with these arguments:

cycle

Duration of the window

For example, sum(cycle='day') tells SignalFlow to calculate the sum every calendar day.

As a result, SignalFlow calculates the sum every calendar day starting at 00:00:00 hours of the current day and returns the value at 00:00:00 of the next calendar day.

To do a calendar window transformation, you must at least specify cycle.

cycle_start

The offset from the default start of the cycle

For example, sum(cycle='day', cycle_start="8h") starts calculating the sum at 08:00:00 of the current day, and returns the value at 08:00:00 the next calendar day.

The expected values and defaults depend on the value of cycle; for example, the cycle_start default for 'day' is '00h'. The defaults for all values of cycle_start are listed in the table Cycle and cycle_start values

NOTE: For cycle='hour' the only valid value for cycle_start is '0m'.

shift_cycles

A number of cycles to shift backwards when reporting the value at the end of the current cycle.

For example, with sum(cycle='day', shift_cycles='1'), the value reported at the end of the current day is the sum calculated over the previous day.

One way to use this option is to compare results between cycles. Use sum(cycle='day') without shift_cycles to get the current cycle’s sum, and sum(cycle='day', shift_cycles='1') to get the previous cycle’s sum. You can then compare the two to get a day-over-day comparison.

NOTE: If you specify shift_cycles, you must set partial_values=False.

partial_values

Flag that controls the return of values during the cycle.

If partial_values=False, SignalFlow only returns a result at the end of the cycle, but if partial_values=True, SignalFlow also returns results during the cycle. The interval at which SignalFlow returns results depends on the resolution of the background job that SignalFlow uses to run the program. The default is False.

NOTE: If you specify partial_values=True, you can’t specify shift_cycles.

Cycle and cycle_start values

The value of cycle_start is related to the value of cycle. If you specify a value for cycle_start that isn’t permitted for the value of cycle you’re using, your request returns an error.

The permitted cycle and cycle_start values are summarized in the following table:

Table 1. SignalFlow cycle and cycle_start values
cycle value cycle_start values cycle_start description

'quarter'

Starting month of the first quarter of the year. The value is a month between 'january' and 'december', inclusive. The default is 'january'.

For example, if you use sum(cycle='quarter', cycle_start='march'), then Q1 is 'march' to 'may', Q2 is 'june' to 'august', and so forth.

'month'

Starting day of the monthly cycle. The value is a day number between '1d' and '28d'. The default is '1d' and the maximum value is '28d'.

Specifies the starting day of a cycle of one calendar month. The actual number of days in the cycle depends on the month in which the calculation is initiated; for example, if cycle_start='3d', you always get a value at the end of the 3rd day, regardless of the number of days in the month.

'week'

Starting day name of the weekly cycle. The value is a day name between 'monday' and 'sunday' inclusive. 'monday' is the default.

Specifies the starting day of a cycle of 7 consecutive days

'day'

Starting hour of the daily cycle. The value is a two-digit number with leading zeros, followed by the letter h. The default is '00h'.

'00h' represents the first moment in the day. 23h is the last hour of the day.

'hour'

Starting minute of the hourly cycle. The default and only allowed value is '0m'.

If you specify cycle='hour', the only value you can specify for cycle_start is '0m'. As an alternative, you can omit cycle_start, because it defaults to '0m'. If you specify any other value for cycle_start, the request returns an error.

Time zone for calendar window transformations

The calendar time zone controls how SignalFlow interprets calendar intervals and associates datapoints with them.

For example, consider a datapoint with a timestamp that’s 11 PM December 31 UTC:

  • You leave the calendar time zone set to UTC: SignalFlow includes the datapoint in the December 31 cycle.

  • You set your calendar time zone to Los Angeles: 11 PM UTC is 3 PM December 31 in Los Angeles. SignalFlow includes the datapoint in the December 31 cycle.

  • You set your calendar time zone to Tokyo: 11 PM UTC is 8 AM January 1 in Tokyo. SignalFlow includes the datapoint in the January 1 cycle.

Setting the timezone has no effect on a SignalFlow program that doesn’t use calendar window transformations.

Supported SignalFlow time zones

The default SignalFlow time zone is Coordinated Universal Time (UTC). The time zones that SignalFlow accepts are a subset of the IANA Time Zone Database; the following table shows the accepted zones:

Show/Hide Table

Setting the SignalFlow time zone

To set the SignalFlow time zone for a program, using the API:

  • For a chart’s SignalFlow program : Set the options.programOptions.timezone property in the request body for the Create Chart or Update Chart operation. To learn more, see the topic Charts API.

  • For a detector’s SignalFlow program: Set the timezone property at the top level of the request body for the Create Detector or Update Detector operation. To learn more, see the topic Detectors API.

  • For a SignalFlow program that you run directly using the operation POST https://stream.{REALM}.signalfx.com/v2/signalflow/execute: Specify timezone=<TIMEZONE_VALUE> as a query parameter for the operation.

Transformations and aggregations

Many analytical functions calculate both aggregations and transformations. If you don’t specify the over keyword argument in the function call, SignalFlow assumes that you want an aggregation. For example, mean() is an aggregation and mean(over='1h30m') is a transformation.

Transformation examples

The following SignalFlow programs show you how to use transformations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#5-minute moving average of the CPU utilization of all servers
data('cpu.utilization').mean(over='5m').publish()

#15-second moving average of the 95th percentile CPU utilization
#grouped by AWS availability zone
data('cpu.utilization').percentile(95,by='aws_availability_zone').mean(over='15s').publish()

##Sum of CPU utilization over one week, starting on Monday
data('cpu.utilization').sum(cycle='week',cycle_start='Monday').publish()

#Quarterly maximum value of the MTS 'cpu.utilization'
data('cpu.utilization').max(cycle='quarter').publish()

#Daily sum, starting at 6 AM, issuing incremental results, of the MTS 'cpu.utilization'.
data('cpu.utilization').sum(cycle='day',cycle_start='6h', partial_values=True).publish()

Other analytical functions

In addition to aggregations and transformations, SignalFlow has analytical functions that perform other actions on streams. These include timeshift(), which retrieves a data point from a specified time offset in the past, and delta(), which performs actions on the current and previous data point, regardless of the time interval between them, among others.

map() and lambda functions

SignalFlow can calculate values in the midst of an incoming stream. To access this functionality, you call the map() method or write a lambda function. In most cases, you use these to perform arithmetic or algebraic calculations on all the values in a stream. SignalFlow only supports single-argument lambdas and only allows references to that argument.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#Calculate the maximum mean cpu.user by datacenter over an hour. Use the floor
#function to "round" the mean.

#With two variables
A = data('cpu.user').mean(by='datacenter')
floor(A).max(over='1h').publish()

#Use the map function
data('cpu.user').mean(by='datacenter').map(lambda x: floor(x)).max(over='1h').publish()

#Multiply the cpu.utilization by 2
data('cpu.utilization').map(lambda x: x*2).publish()

#Get the floor of cpu.utilization raise to the power of 4
data('cpu.utilization').map(lambda x: floor(pow(x,4))).publish()

In addition to calculations, lambda functions allow if-else expressions, using the syntax

<TRUE_VALUE> if <TRUE_CONDITION> else <FALSE_VALUE>

Using the value None, you can use this type of lambda function to filter values from the output. For example:

1
2
#Filter if x is between 50 and 60
data('cpu.utilization').map(lambda x: x if x > 50 and x < 60 else None).publish()

Using streams as variables

You can use streams as inputs to mathematical expressions, just as you’d use a variable in an formula. To do this, assign a stream name to a variable name at the beginning of the program. Because the result of a calculation on one or more stream objects is also a stream object, you can perform aggregations or transforms on it as well. To perform further analytics on a stream calculation (or to stream the results), wrap the calculation in parentheses.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#Inline stream calculation
(data('cache.hits') / data('cache.total') * 100).mean(over='1h').publish()

#Using a stream as a variable
cache_hits = data('cache.hits').sum(by='datacenter')
cache_misses = data('cached.misses').sum(by='datacenter')
hit_pct = (cache_hits / (cache_hits + cache_misses) * 100).mean(over='1h')
hit_pct.publish('hit_pct')

#A stream calculation can be used in other stream calculations
used_pct = (data('cpu.used') / data('cpu.*').sum()) * 100
cpu_per_thread = (used_pct / data('server.threadcount')).publish()

SignalFlow API

SignalFlow programs run asynchronously in background jobs. By default, SignalFx immediately starts jobs for SignalFlow programs in charts and detectors. The job automatically publishes its results back to the chart or detector.

SignalFx also lets you run SignalFlow programs as background jobs using REST HTTP endpoints or a WebSocket protocol.

Whenever possible, you should use WebSocket, because it offers these advantages:

  • Multiple data streams and computations using a single HTTP connection

  • Encoding protocol that’s more efficient than the REST HTTP protocol

  • Overall lower latency.

The HTTP endpoints offer the same functionality, and they’re more straightforward to add to your programs, but they require one HTTP connection per job.

Regardless of the request format you use, the SignalFlow API returns results in a message stream that contains

  • Execution information

  • Metadata for the time series being generated

  • Generated data and events

The format of this message stream depends on the request API you use:

  • REST API: SignalFlow returns Server-Sent Event (SSE) messages.

  • WebSocket API: SignalFlow returns WebSocket JSON messages.

The following topics provide reference information for the APIs:

Using the SignalFlow API

Connecting (WebSocket API only)

If you’re using the WebSocket API, establish a WebSocket connection with SignalFx before you authenticate. To do this, use the REST API operation GET https://stream.{REALM}.signalfx.com/v2/signalflow/connect.

Authenticating

To use the SignalFlow API, you need to have an org token or a user access token.

If you’re using the REST API, send the token in the header for the operation POST https://stream.{REALM}.signalfx.com/v2/signalflow/execute that starts your computation job.

If you’re using the WebSocket API, send an authenticate message. You must do this within 5 seconds of connecting to SignalFx; otherwise your connection is dropped.

To learn more about authentication and tokens, refer to the topic Authentication Tokens.

Executing background computations

To run a SignalFlow program in the background:

  • REST API: Use the operation Start SignalFlow Computation. When you use the REST HTTP API, you receive results as Server-Sent Event (SSE) messages.

  • WebSocket API: Send an execute message. You receive results as JSON messages.

Detaching from background jobs

To stop receiving output from a computation, detach from the background job:

  • REST API: Close the connection.

  • WebSocket API: Send a detach request.

While WebSocket remains connected, SignalFx keeps your computation alive. If you disconnect using an end_of_channel message, SignalFx stops the computation. In addition, if SignalFx detects an error associated with the computation or connection, it sends you an abort_channel message and closes the computation.

Client library support

To simplify the use of the APIs, SignalFx provides client libraries for several languages. Click a name to navigate to the library repository in GitHub:

Examples

This section contains examples that show you how to use the SignalFlow API.

Command-line example

The following curl command demonstrates how to start a computation that calculates average CPU utilization over all servers.

1
2
3
4
5
6
$ curl \
    --request POST \
    --header "Content-Type: text/plain" \
	--header "X-SF-Token: YOUR_ACCESS_TOKEN" \
    --data "data(\"cpu.utilization\").mean().publish()" \
    https://stream.signalfx.com/v2/signalflow/execute

The result is streaming SSE messages. For more information:

Python client library example

The Python client library for the SignalFlow API is available in pip. You can also view the library code in the GitHub signalfx-python client library repo. The following code shows you how to write the previous example using the Python library:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import signalfx

signalflow_program = "data('cpu.utilization').mean().publish('cpu')"
signalflow_client = signalfx.SignalFx().signalflow('MY_SFX_TOKEN')
try:
    print('Executing {0} ...'.format(program))
    computation = signalflow_client.execute(program)
    for msg in computation.stream():
        if isinstance(msg, signalfx.signalflow.messages.DataMessage):
            print('{0}: {1}'.format(msg.logical_timestamp_ms, msg.data))
        if isinstance(msg, signalfx.signalflow.messages.EventMessage):
            print('{0}: {1}'.format(msg.timestamp_ms, msg.properties))
finally:
    signalflow_client.close()

To learn more about the Python client library, see the signalfx-python GitHub repository.

Node.js client library example

The node.js client library for the SignalFlow API is available in npm. You can also view the library code in the GitHub signalfx-nodejs client library repo.

The following code shows you how to write the previous examples using the node.js library:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
var signalfx = require('signalfx')

var flow = signalfx.SignalFlow('MY_SFX_TOKEN');

var computation = flow.execute({
    program: "data('cpu.utilization').publish('cpu')",
    start: Date.now() - 60000,
    stop: Date.now() + 600000,
    resolution: 10000,
    maxDelay: 1000
  });

computation.stream(function (err, data) {
  if(err) {
    // handle error
  } else {
    console.log(data);
  }
});

© Copyright 2019 SignalFx.

Third-party license information