SignalFx

SignalFlow Overview

SignalFlow is the language used to describe computations for SignalFx's real-time analytics engine. It is syntactically similar to Python and is designed to be easy to understand, and also to be easy to use for the purpose of expressing complex computations.

# Filter for app servers but not in qa environment
app_servers = filter('serverType', 'app') and not filter('env', 'qa')

# Sum cache hits and misses across selected app servers by datacenter
cache_hits = data('cache.hits', filter=app_servers).sum(by='datacenter')
cache_misses = data('cache.misses', filter=app_servers).sum(by='datacenter')
total = cache_hits + cache_misses

# Calculate and publish 1-hour moving averages of hits and misses as percent of total
((cache_hits / total) * 100).mean(over='1h').publish('hit_pct')
((cache_misses/ total) * 100).mean(over='1h').publish('miss_pct')

In the SignalFlow program example above, you can see that:

  • SignalFlow has built-in functions like data() which produce stream objects
  • The functions have positional and keywords arguments, e.g. to filter the streams
  • Stream objects also have functions to operate, compute and modify those streams into other streams, e.g. sum() or mean()
  • Comments are supported with the # character. All characters from # to the end of a line will be ignored

SignalFlow does not support the full Python syntax. Specifically, while variables may be reassigned at any time in Python, a symbol may only be bound once in SignalFlow

Computations

A SignalFlow computation is a running instance of a streaming calculation as defined by its SignalFlow program. A computation has a start time and a stop time, and operates at a particular resolution. The start time, if not provided, defaults to "now", and the stop time, if not provided, defaults to "infinity" (streams forever). The resolution determines the interval at which data is processed and output is generated by the computation. For each interval, each published stream in the computation will emit data.

If the start time is chosen such that historical data is used in the computation, output is produced as quickly as it is calculated by the SignalFx analytics engine. For computations making use of data as it arrives ("now"), the output is produced in real-time, at a cadence governed by the computation's resolution.

Resolution

In general, the computation's resolution is automatically determined by the system to match the reporting frequency of the input data. It is possible to ask for a minimum resolution, to force the computation to operate at a coarser resolution than has been automatically determined. (The converse is not true: It is not possible to force the system to operate at a resolution that is finer than the data's reporting interval, as this would create invalid results.)

Regardless of the chosen resolution, input data is rolled up to the compute resolution using a rollup algorithm that matches the metric type of the input data. This can be overridden by using the rollup algorithm on the data function.

MaxDelay

Because SignalFx is based on a real-time streaming analytics system, timeliness of the output is a very important feature. In the face of late input data, it is not always possible to wait until all input data arrives, as there are reasons why it might never arrive (e.g. the source has been decommissioned), or why it might arrive very late. To account for this problem, SignalFx's analytics system evaluates, measures and tracks the lag of the input data to establish a deadline after which the computation will "move on" even in the face of missing input data. This allows the computation to continue to operate with all available data, while setting an upper bound of the maximum perceived lag in the computation's output.

If you expect your data to be reported with unexpected variance in lag, you can override MaxDelay to ensure that all of your data will be included in the computation.

Streams and Filters

Before reading this section, make sure you are familiar with the SignalFx Metrics Metadata Overview.

Streams

The core object in a SignalFlow computation is a stream of data. Streams are identified using queries that return data points, usually from multiple metric time series. As such, a good mental model is to think of a stream as an ongoing series of 1-by-n vectors of data points (where n is the number of time series returned by the query), rather than a series of single data points.

# Specify a stream
data('cpu.utilization')
data('cpu.*')

# Make the output visible
data('cpu.utilization').publish()
data('cpu.*').publish()

By default, a stream object will not output any datapoints from a computation. In order to make its output visible externally, you must use the publish() method.

Note that a data point contains not just the actual metric value (e.g, 5, 10.5, etc.), but also metadata that provides useful contextual information about the value (e.g. that it comes from the production environment, or from the datacenter in Santa Clara, CA). That metadata can be used in filter expressions or in analytical functions.

Filters

Filters are used to specify which metric time series to include in a stream, using dimensions or properties on the metric time series. To create a filter, use the built-in function filter() with the dimension or property name as the first argument and at least one query argument. The query string supports non-leading wildcarding via the * character. You can specify many query arguments to do OR filtering for a single field.

# Filter on metric timeseries with a serverType of API
filter('serverType', 'API')

# Filter on metric timeseries with an aws_tag_Name that starts with api
filter('aws_tag_Name', 'api*')

# Filter on metric timeseries with an api of either 'login' or 'logout'
filter('api','login','logout')

Filter objects can be combined into filter expressions with the and, or and not keywords. Operator precedence is the same as Boolean expressions. Parentheses can be used to force precedence.

# Simple AND expression
filter('serverType', 'API') and filter('env', 'QA')

# A more complex example
(filter('serverType', 'db.core', 'db.staging*') and
 not filter('serverType', 'db.staging-qa')) and filter('api', 'login')

Stream labels

You can label the output of a stream by using the label arg. While not required, doing so can be useful for a number of reasons, e.g. to distinguish between streams so as to be able to visualize them accordingly.

# Stream output without a label
data('cache.hits').sum(by='datacenter').mean(over='1h').percentile(90).publish()

# Stream output with the label 'misses'
data('cache.misses').sum(by='datacenter').mean(over='1h').percentile(90).publish('misses')

Analytical Functions

SignalFlow includes a large library of built-in analytical functions that take a stream as an input, perform computations on its data points, and output a modified stream that is the result of the computation. Generally speaking, there are two main classes of computations: aggregations and transformations.

Aggregation

Aggregations take all of the data points in a stream at a given instance of time -- the entire 1-by-n vector at time t -- and performs a calculation on them. Some example of analytical functions that compute aggregations include:

Click here for the full list of analytical functions that perform aggregations.

# Overall CPU utilization average
data('cpu.utilization').mean().publish()

# 95th percentile CPU utilization by AWS availability zone
data('cpu.utilization').percentile(95, by='aws_availability_zone').publish()

# CPU utilization average by service and instance type
data('cpu.utilization').mean(by=['service', 'aws_instance_type']).publish()

In the absence of additional arguments on the analytical function, the output of an aggregation on a 1-by-n vector (i.e., 1 data point from each of n time series at time t) is a single data point. For example, if you apply a sum() to an input stream that is composed of 5 time series, with values of 5, 10, 15, 20, and 10 at time t, then the output will be a single value of 60.

Aggregation functions support the option of grouping the output, using the metadata that is associated with the metric values. The fields to group by are specified with the by keyword argument. Either a single field as a string e.g. by='datacenter' or multiple fields can be specified using a string list: e.g by=['env', 'datacenter'].

If the by keyword is specified, the output stream will be grouped accordingly, with the number of groups equivalent to the number of unique combinations of the included arguments. For example, if the 5 values used above are associated with env=prod, env=prod, env=dev, env=prod and env=dev, respectively, and you perform a sum with by=’env’, then the output will be 35 for env=prod and 25 for env=dev.

Note: If parts of the input stream do not have the specified argument in their metadata, they do not participate in the aggregation and will be dropped from the rest of the calculation. In other words, the by also functions as an implicit filter.

Transformation

The second class of computation that can be performed is a transformation. Instead of performing the computation across all input datapoints at a single instance of time, transformations perform the computation on a each individual time series over a window of time. A common example of a transformation is a moving average, which is the mean of some set of values that fall within a sliding time window.

Some examples of transformations include:

Click here for the full list of analytical functions that perform transformations.

# 5-minute moving average of the CPU utilization of each server
data('cpu.utilization').mean(over='5m').publish()

# 15-second moving average of the 95th percentile CPU utilization
# by AWS availability zone
data('cpu.utilization').percentile(95,by='aws_availability_zone').mean(over='15s').publish()

The time range over which to do the transformation is specified with the over keyword argument with a duration value, e.g. mean(over='1h30m').

Note that many of the analytical functions in SignalFlow can be applied as an aggregation or a transformation, and if you don't specify the over argument, the language assumes you are doing an aggregation. For example, mean() is an aggregation applied across multiple time series, and mean(over='1h30m') is a transformation.

Other analytical functions

In addition to aggregations and transformations, SignalFlow includes a variety of analytical functions that perform other actions on streams. These include timeshift() (which retrieves a data point from a specified time offset in the past) and delta() (which performs actions on the current and previous data point, irrespective of the time interval between them), among others.

map() and lambda functions

SignalFlow includes the ability to perform midstream calculations using the map() and lambda functions. These are most useful when you want to do simple things like perform a multiplication or division of all stream values or do algebraic calculations on all values in a stream. SignalFlow currently only supports single argument lambdas and only allows references to that argument.

# Calculate the maximum mean cpu.user by datacenter over an hour. Use the floor function to "round" the mean.

# With two variables
A = data('cpu.user').mean(by='datacenter')
floor(A).max(over='1h').publish()

# Use the map function
data('cpu.user').mean(by='datacenter').map(lambda x: floor(x)).max(over='1h').publish()

# Multiply the cpu.utilization by 2
data('cpu.utilization').map(lambda x: x*2).publish()

# Get the floor of cpu.utilization raise to the power of 4
data('cpu.utilization').map(lambda x: floor(pow(x,4))).publish()

In addition to functions, lambda functions also allow an if-else expression. The format of these expressions is <value if condition is true> if <condition> else <value if condition is false> . Using the None value you can use this to filter values from the output.

# Filter if x is between 50 and 60 
data('cpu.utilization').map(lambda x: x if x > 50 and x < 60 else None).publish()

Note that this mechanism differs from the SignalFx web application, which includes both a scale function and an exclude function to perform actions that lambda handles in a more generic fashion.

Calculations using streams

It is possible to use streams as inputs to mathematical expressions, just as you would use a variable in an algebraic formula. To do so, provide a variable name at the beginning of the streams that you want to be able to reference in downstream calculations. The output of a calculation using one or more stream objects is also a stream object; therefore, you can perform aggregations or transforms on it. In order to perform further analytics on a stream calculation (or to stream the results) the calculation must be wrapped in parentheses.

# Inline stream calculation
(data('cache.hits') / data('cache.total') * 100).mean(over='1h').publish()

# Using named streams
cache_hits = data('cache.hits').sum(by='datacenter')
cache_misses = data('cached.misses').sum(by='datacenter')
hit_pct = (cache_hits / (cache_hits + cache_misses) * 100).mean(over='1h')
hit_pct.publish('hit_pct')

# Stream calculation can be used in other stream calculations
used_pct = (data('cpu.used') / data('cpu.*').sum()) * 100
cpu_per_thread = (used_pct / data('server.threadcount')).publish()

Putting It Together

Create a stream

Streams are instantiated via the data() built-in function with a query to select the metric time series you are interested in. The query string supports non-leading wildcarding via the '*' character. An optional filter object is also supported.

data('cpu.utilization').publish()
data('cpu.*').publish()

# With a filter
data('cpu.utilization', filter=filter('env', 'QA') and filter('serverType', 'API')).publish()

# The filter can also be bound beforehand
qa_api_servers = filter('env', 'QA') and filter('serverType', 'API')
data('cpu.utilization', filter=qa_api_servers).publish()

Apply analytics to a stream

Analytics are applied to a stream by chaining one or more calls to analytics methods on a created stream object:

# Find the p90 of the 1h mean of total cache hits by datacenter
data('cache.hits').sum(by='datacenter').mean(over='1h').percentile(90)

Detectors

SignalFlow builds on top of its streaming and computational abilities to enable sophisticated alerting, by recognizing when a condition has been met, and then by generating events accordingly. Among other things, the events can be used to trigger notifications in incident management platforms (e.g. PagerDuty) or messaging systems (e.g. Slack or email).

# send events when cpu.utilization is above 50 (and when it falls below again)
detect(data('cpu.utilization') > 50).publish('cpu_too_high')

Conditions

Conditions are specified with predicate objects. A predicate is an expression that evaluates to true or false. It takes a stream (or streams) as its input, and then compares each value in each 1- by-n vector in the stream against a threshold. If the comparison evaluates to true, then the value in that vector is replaced with a Boolean True; otherwise it is replaced with False. Any metadata associated with the input value will continue to be associated with the True output value.

Predicates can be composed using the and, or, and not keywords along with parentheses to force precedence.

Examples of valid predicates in SignalFlow include:

# True when any cpu.utilization timeseries' value is greater than 50
data('cpu.utilization') > 50

# True when cpu.utilization greater than 30 and memory.utilization less than 40
data('cpu.utilization') > 50 and data('memory.utilization') < 40

# Complex example of a condition. Checking if the moving average of the memory
# utilization is greater than 2 times the stddev 
mem = data('memory.utilization')
mem_mean = mem.mean(over='1h')
mem_stddev = mem.stddev()
mem_mean > 2 * mem_stddev

By default, the state of the predicate changes when its predicate value changes (from False to True, or from True to False). However, a when() function can be used to require that the condition holds for a certain amount of time, or a percentage of a certain amount of time. These duration controls are typically used to create detectors that ignore temporary spikes.

# Will generate True the instant when cpu.utilization is above 50, it will generate False the instant cpu.utilization falls below 50.
when(data('cpu.utilization') > 50)

# Will generate True when cpu.utilization has been above 50 for 5 minutes 
# continuously, it will generate False again once cpu.utlilization goes below
# 50 continously for 5 minutes
when(data('cpu.utilization') > 50, '5m')

# Will generate True when cpu.utilization has been above 50 for 75% of 5 minutes 
# continuously, it will generate False again once cpu.utlilization goes below 50
# continously for 75% of 5 minutes
when(data('cpu.utilization') > 50, '5m', .75)

Conditions behave like filters in the sense that it is possible to define one without actually making use of it. To make use of it, you need the detect() function.

Sending events conditionally

The detect() function takes the output of predicate objects as its input, and generates events. There are two categories of events created by detect() objects:

  • Anomalous
  • Ok

The events that are generated depend on whether only the required on parameter is specified, or if the optional off parameter is also specified. (In either case, the parameter is a predicate.)

If only an on parameter is specified:

  • An Anomalous event is fired when the state of the predicate changes from False to True. This corresponds to the case when the threshold condition is met.
  • Ok is generated when the state of the predicate changes from True to False. This corresponds to the case when the threshold condition is no longer being met.

If both on and off parameters are specified:

  • An Anomalous event is fired when the state of the predicate passed as the on parameter changes from False to True.
  • Ok is generated when the state of the predicate passed as the off parameter changes from emitting False to True and when the predicate passed to on is False.

In order to publish the events created by a detect() you must invoke the publish()

# Send events when cpu.utilization is above 50 (and when it falls below again)
detect(data('cpu.utilization') > 50).publish('cpu_too_high')

# Send events when cpu.utilization is above 50 and "clear" events when the cpu.utilization is below 40 and memory.utilization is below 20 for 5 minutes
cpu = data('cpu.utilization')
mem = data('memory.utilization')
detect(cpu > 50, when(cpu < 40 and mem < 20,'5m')).publish('cpu_too_high')

SignalFlow Overview