SignalFlow is the language used to describe computations for SignalFx's real-time analytics engine. It is syntactically similar to Python and is designed to be easy to understand, and also to be easy to use for the purpose of expressing complex computations.
# Filter for app servers but not in qa environment app_servers = filter('serverType', 'app') and not filter('env', 'qa') # Sum cache hits and misses across selected app servers by datacenter cache_hits = data('cache.hits', filter=app_servers).sum(by='datacenter') cache_misses = data('cache.misses', filter=app_servers).sum(by='datacenter') total = cache_hits + cache_misses # Calculate and publish 1-hour moving averages of hits and misses as percent of total ((cache_hits / total) * 100).mean(over='1h').publish('hit_pct') ((cache_misses/ total) * 100).mean(over='1h').publish('miss_pct')
In the SignalFlow program example above, you can see that:
- SignalFlow has built-in functions like data() which produce stream objects
- The functions have positional and keywords arguments, e.g. to filter the streams
- Stream objects also have functions to operate, compute and modify those streams into other streams, e.g. sum() or mean()
- Comments are supported with the
#character. All characters from
#to the end of a line will be ignored
SignalFlow does not support the full Python syntax. Specifically, while variables may be reassigned at any time in Python, a symbol may only be bound once in SignalFlow
A SignalFlow computation is a running instance of a streaming calculation as defined by its SignalFlow program. A computation has a start time and a stop time, and operates at a particular resolution. The start time, if not provided, defaults to "now", and the stop time, if not provided, defaults to "infinity" (streams forever). The resolution determines the interval at which data is processed and output is generated by the computation. For each interval, each published stream in the computation will emit data.
If the start time is chosen such that historical data is used in the computation, output is produced as quickly as it is calculated by the SignalFx analytics engine. For computations making use of data as it arrives ("now"), the output is produced in real-time, at a cadence governed by the computation's resolution.
In general, the computation's resolution is automatically determined by the system to match the reporting frequency of the input data. It is possible to ask for a minimum resolution, to force the computation to operate at a coarser resolution than has been automatically determined. (The converse is not true: It is not possible to force the system to operate at a resolution that is finer than the data's reporting interval, as this would create invalid results.)
Regardless of the chosen resolution, input data is rolled up to the compute resolution using a rollup algorithm that matches the metric type of the input data. This can be overridden by using the
rollup algorithm on the data function.
Because SignalFx is based on a real-time streaming analytics system, timeliness of the output is a very important feature. In the face of late input data, it is not always possible to wait until all input data arrives, as there are reasons why it might never arrive (e.g. the source has been decommissioned), or why it might arrive very late. To account for this problem, SignalFx's analytics system evaluates, measures and tracks the lag of the input data to establish a deadline after which the computation will "move on" even in the face of missing input data. This allows the computation to continue to operate with all available data, while setting an upper bound of the maximum perceived lag in the computation's output.
If you expect your data to be reported with unexpected variance in lag, you can override MaxDelay to ensure that all of your data will be included in the computation.
Before reading this section, make sure you are familiar with the SignalFx Metrics Metadata Overview.
The core object in a SignalFlow computation is a stream of data. Streams are identified using queries that return data points, usually from multiple metric time series. As such, a good mental model is to think of a stream as an ongoing series of 1-by-n vectors of data points (where n is the number of time series returned by the query), rather than a series of single data points.
# Specify a stream data('cpu.utilization') data('cpu.*') # Make the output visible data('cpu.utilization').publish() data('cpu.*').publish()
By default, a stream object will not output any datapoints from a computation. In order to make its output visible externally, you must use the publish() method.
Note that a data point contains not just the actual metric value (e.g, 5, 10.5, etc.), but also metadata that provides useful contextual information about the value (e.g. that it comes from the production environment, or from the datacenter in Santa Clara, CA). That metadata can be used in filter expressions or in analytical functions.
Filters are used to specify which metric time series to include in a stream, using dimensions or properties on the metric time series. To create a filter, use the built-in function filter() with the dimension or property name as the first argument and at least one query argument. The query string supports non-leading wildcarding via the
* character. You can specify many query arguments to do OR filtering for a single field.
# Filter on metric timeseries with a serverType of API filter('serverType', 'API') # Filter on metric timeseries with an aws_tag_Name that starts with api filter('aws_tag_Name', 'api*') # Filter on metric timeseries with an api of either 'login' or 'logout' filter('api','login','logout')
Filter objects can be combined into filter expressions with the
not keywords. Operator precedence is the same as Boolean expressions. Parentheses can be used to force precedence.
# Simple AND expression filter('serverType', 'API') and filter('env', 'QA') # A more complex example (filter('serverType', 'db.core', 'db.staging*') and not filter('serverType', 'db.staging-qa')) and filter('api', 'login')
You can label the output of a stream by using the
label arg. While not required, doing so can be useful for a number of reasons, e.g. to distinguish between streams so as to be able to visualize them accordingly.
# Stream output without a label data('cache.hits').sum(by='datacenter').mean(over='1h').percentile(90).publish() # Stream output with the label 'misses' data('cache.misses').sum(by='datacenter').mean(over='1h').percentile(90).publish('misses')
SignalFlow includes a large library of built-in analytical functions that take a stream as an input, perform computations on its data points, and output a modified stream that is the result of the computation. Generally speaking, there are two main classes of computations: aggregations and transformations.
Aggregations take all of the data points in a stream at a given instance of time -- the entire 1-by-n vector at time t -- and performs a calculation on them. Some example of analytical functions that compute aggregations include:
# Overall CPU utilization average data('cpu.utilization').mean().publish() # 95th percentile CPU utilization by AWS availability zone data('cpu.utilization').percentile(95, by='aws_availability_zone').publish() # CPU utilization average by service and instance type data('cpu.utilization').mean(by=['service', 'aws_instance_type']).publish()
In the absence of additional arguments on the analytical function, the output of an aggregation on a 1-by-n vector (i.e., 1 data point from each of n time series at time t) is a single data point. For example, if you apply a sum() to an input stream that is composed of 5 time series, with values of 5, 10, 15, 20, and 10 at time t, then the output will be a single value of 60.
Aggregation functions support the option of grouping the output, using the metadata that is associated with the metric values. The fields to group by are specified with the
by keyword argument. Either a single field as a string e.g.
by='datacenter' or multiple fields can be specified using a string list: e.g
by keyword is specified, the output stream will be grouped accordingly, with the number of groups equivalent to the number of unique combinations of the included arguments. For example, if the 5 values used above are associated with env=prod, env=prod, env=dev, env=prod and env=dev, respectively, and you perform a sum with
by=’env’, then the output will be 35 for env=prod and 25 for env=dev.
Note: If parts of the input stream do not have the specified argument in their metadata, they do not participate in the aggregation and will be dropped from the rest of the calculation. In other words, the
by also functions as an implicit filter.
The second class of computation that can be performed is a transformation. Instead of performing the computation across all input datapoints at a single instance of time, transformations perform the computation on a each individual time series over a window of time. A common example of a transformation is a moving average, which is the mean of some set of values that fall within a sliding time window.
Some examples of transformations include:
# 5-minute moving average of the CPU utilization of each server data('cpu.utilization').mean(over='5m').publish() # 15-second moving average of the 95th percentile CPU utilization # by AWS availability zone data('cpu.utilization').percentile(95,by='aws_availability_zone').mean(over='15s').publish()
The time range over which to do the transformation is specified with the
over keyword argument with a duration value, e.g.
Note that many of the analytical functions in SignalFlow can be applied as an aggregation or a transformation, and if you don't specify the
over argument, the language assumes you are doing an aggregation. For example,
mean() is an aggregation applied across multiple time series, and
mean(over='1h30m') is a transformation.
In addition to aggregations and transformations, SignalFlow includes a variety of analytical functions that perform other actions on streams. These include timeshift() (which retrieves a data point from a specified time offset in the past) and delta() (which performs actions on the current and previous data point, irrespective of the time interval between them), among others.
SignalFlow includes the ability to perform midstream calculations using the map() and
lambda functions. These are most useful when you want to do simple things like perform a multiplication or division of all stream values or do algebraic calculations on all values in a stream. SignalFlow currently only supports single argument lambdas and only allows references to that argument.
# Calculate the maximum mean cpu.user by datacenter over an hour. Use the floor function to "round" the mean. # With two variables A = data('cpu.user').mean(by='datacenter') floor(A).max(over='1h').publish() # Use the map function data('cpu.user').mean(by='datacenter').map(lambda x: floor(x)).max(over='1h').publish() # Multiply the cpu.utilization by 2 data('cpu.utilization').map(lambda x: x*2).publish() # Get the floor of cpu.utilization raise to the power of 4 data('cpu.utilization').map(lambda x: floor(pow(x,4))).publish()
In addition to functions,
lambda functions also allow an
if-else expression. The format of these expressions is <value if condition is true>
else <value if condition is false> . Using the
None value you can use this to filter values from the output.
# Filter if x is between 50 and 60 data('cpu.utilization').map(lambda x: x if x > 50 and x < 60 else None).publish()
Note that this mechanism differs from the SignalFx web application, which includes both a
scale function and an
exclude function to perform actions that lambda handles in a more generic fashion.
It is possible to use streams as inputs to mathematical expressions, just as you would use a variable in an algebraic formula. To do so, provide a variable name at the beginning of the streams that you want to be able to reference in downstream calculations. The output of a calculation using one or more stream objects is also a stream object; therefore, you can perform aggregations or transforms on it. In order to perform further analytics on a stream calculation (or to stream the results) the calculation must be wrapped in parentheses.
# Inline stream calculation (data('cache.hits') / data('cache.total') * 100).mean(over='1h').publish() # Using named streams cache_hits = data('cache.hits').sum(by='datacenter') cache_misses = data('cached.misses').sum(by='datacenter') hit_pct = (cache_hits / (cache_hits + cache_misses) * 100).mean(over='1h') hit_pct.publish('hit_pct') # Stream calculation can be used in other stream calculations used_pct = (data('cpu.used') / data('cpu.*').sum()) * 100 cpu_per_thread = (used_pct / data('server.threadcount')).publish()
Streams are instantiated via the data() built-in function with a query to select the metric time series you are interested in. The query string supports non-leading wildcarding via the '*' character. An optional filter object is also supported.
data('cpu.utilization').publish() data('cpu.*').publish() # With a filter data('cpu.utilization', filter=filter('env', 'QA') and filter('serverType', 'API')).publish() # The filter can also be bound beforehand qa_api_servers = filter('env', 'QA') and filter('serverType', 'API') data('cpu.utilization', filter=qa_api_servers).publish()
Analytics are applied to a stream by chaining one or more calls to analytics methods on a created stream object:
# Find the p90 of the 1h mean of total cache hits by datacenter data('cache.hits').sum(by='datacenter').mean(over='1h').percentile(90)
SignalFlow builds on top of its streaming and computational abilities to enable sophisticated alerting, by recognizing when a condition has been met, and then by generating events accordingly. Among other things, the events can be used to trigger notifications in incident management platforms (e.g. PagerDuty) or messaging systems (e.g. Slack or email).
# send events when cpu.utilization is above 50 (and when it falls below again) detect(data('cpu.utilization') > 50).publish('cpu_too_high')
Conditions are specified with predicate objects. A predicate is an expression that evaluates to true or false. It takes a stream (or streams) as its input, and then compares each value in each 1- by-n vector in the stream against a threshold. If the comparison evaluates to true, then the value in that vector is replaced with a Boolean
True; otherwise it is replaced with
False. Any metadata associated with the input value will continue to be associated with the
True output value.
Predicates can be composed using the
not keywords along with parentheses to force precedence.
Examples of valid predicates in SignalFlow include:
# True when any cpu.utilization timeseries' value is greater than 50 data('cpu.utilization') > 50 # True when cpu.utilization greater than 30 and memory.utilization less than 40 data('cpu.utilization') > 50 and data('memory.utilization') < 40 # Complex example of a condition. Checking if the moving average of the memory # utilization is greater than 2 times the stddev mem = data('memory.utilization') mem_mean = mem.mean(over='1h') mem_stddev = mem.stddev() mem_mean > 2 * mem_stddev
By default, the state of the predicate changes when its predicate value changes (from
True, or from
False). However, a
when() function can be used to require that the condition holds for a certain amount of time, or a percentage of a certain amount of time. These duration controls are typically used to create detectors that ignore temporary spikes.
# Will generate True the instant when cpu.utilization is above 50; will # generate False the instant cpu.utilization falls below 50. when(data('cpu.utilization') > 50) # Will generate True when cpu.utilization has been above 50 for 5 minutes # continuously; will generate False if cpu.utilization drops below 50 at # any point during the next 5 minutes, i.e., even if it dips instantaneously # below 50. when(data('cpu.utilization') > 50, '5m') # Will generate True when cpu.utilization has been above 50 for 75% of 5 minutes # continuously; will generate False if the cpu.utilization drops below 50 for # more than 25% of 5 minutes. when(data('cpu.utilization') > 50, '5m', .75)
Conditions behave like filters in the sense that it is possible to define one without actually making use of it. To make use of it, you need the detect() function.
The detect() function takes the output of predicate objects as its input, and generates events. There are two categories of events created by
The events that are generated depend on whether only the required
on parameter is specified, or if the optional
off parameter is also specified. (In either case, the parameter is a predicate.)
If only an
on parameter is specified:
Anomalousevent is fired when the state of the predicate changes from
True. This corresponds to the case when the threshold condition is met.
Okis generated when the state of the predicate changes from
False. This corresponds to the case when the threshold condition is no longer being met.
off parameters are specified:
Anomalousevent is fired when the state of the predicate passed as the
onparameter changes from
Okis generated when the state of the predicate passed as the
offparameter changes from emitting
Trueand when the predicate passed to
# Send events when cpu.utilization is above 50 (and when it falls below again) detect(data('cpu.utilization') > 50).publish('cpu_too_high') # Send events when cpu.utilization is above 50 and "clear" events when the cpu.utilization is below 40 and memory.utilization is below 20 for 5 minutes cpu = data('cpu.utilization') mem = data('memory.utilization') detect(cpu > 50, when(cpu < 40 and mem < 20,'5m')).publish('cpu_too_high')