Skip to main content
Version: 8.1

Custom Tag History Aggregates

Python Aggregation Functions

The Tag History system has many built-in aggregate function, such as Average, Sum, and Count. However a custom aggregate may be defined via Python scripting. These functions are used for calculations across time frames, and they process multiple values in a “window” into a single result value.

For example, if a query defines a single row result, but covers an hour of time (either by requesting a single row, or using the Tag Calculations feature), the system must decide how to combine the values. There are many built in functions, such as Average, Sum, Count, etc. Using a custom Python aggregate, however, allows you to extend these functions and perform any type of calculation.

Description

As values come in, they will be delivered to this function. The interpolator will create and deliver values. For each window (or “data block”, the terms are used synonymously), the function will get a fresh copy of blockContext. The blockContext is a dictionary that can be used to as a memory space. The function should not use global variables. If values must be persisted across blocks, they can be stored in the queryContext, which is also a dictionary.

The function can choose what data to include, such as allowing interpolation or not, and allowing bad quality or not.

The window will receive the following values, many of which are generally interpolated (unless a raw value happens to fall exactly at the time):

  • The start of the window
  • 1 ms before each raw value (due to the difference between discrete and analog interpolation. A value equal to the previous raw value indicates discrete interpolation)
  • The raw value
  • The end of the window

At the end of the window, the function will be called with “finished=true”. The function should return the calculated value(s). The resulting value will have a timestamp that corresponds to the beginning of the block time frame.

Parameters

  • qval - The incoming QualifiedValue. This has:
    • value : Object
    • quality : Quality (which has ‘name’, ‘isGood()’)
    • timestamp : Date
  • interpolated - Boolean indicating if the value is interpolated (true) or raw (false)
  • finished - Boolean indicating that the window is finished. If true, the return of this particular call is what will be used for the results. If false, the return will be ignored.
  • blockContext - A dictionary created fresh for this particular window. The function may use this as temporary storage for calculations. This object also has:
    • blockId - Integer roughly indicating the row id (doesn’t take into account aggregates that return multiple rows)
    • blockStart - Long UTC time of the start of the window
    • blockEnd - Long UTC time of the end of the window
    • previousRawValue - QualifiedValue, the previous non-interpolated value received before this window
    • previousBlockResults - QualifiedValue[], the results of the previous window.
    • insideBlock(long) - Returns boolean indicating if the time is covered by this window.
    • get(key, default) - A helper function that conforms to python’s dictionary “get with default return”.
  • queryContext - A dictionary that is shared by all windows in a query. It also has:
    • queryId - String, an id that can be used to identify this query in logging
    • blockSize - Long, time in ms covered by each window
    • queryStart - Long, the start time of the query
    • queryEnd - Long, the end time of the query
    • logTrace(), logDebug(), logInfo() - all take (formatString, Object... args).

Return Value

Your custom aggregate should return one of the following for each window:

  • Object - Turned into Good Quality qualified value
  • List - Used to return up to 2 values per window
  • Tuple - (value, quality_int)
  • List of quality tuples

Usage

Custom Python aggregates can be used in two ways:

  1. Defined as a Project Library script, where the full path to the function is passed to the query.
  2. Defined as a string, prefaced with “python:”, and passed to the query.

Currently both options are only available through the system.tag.queryTagHistory/queryTagCalculations functions.

Both of these options are used with the “aggregationMode” and “ aggregationModes” parameters to system.tag.queryTagHistory, and the “calculations” parameter of system.tag.queryTagCalculations. If the value is not an Enum value from the defined AggregationModes, it will be assumed to be a custom aggregate. The system will first see if it’s the path to a Project Library script, and if not, will then try to compile it as a full function.

For performance reasons, it is generally recommended to use the Project Library script whenever possible. For more information, see Project Library.

Conditions

There are some key factors to keep in mind when calling a custom aggregate.

Return Size

When calling a custom tag history aggregate, the returnSize argument must be set to a number greater than 0, otherwise the custom aggregate will be ignored.

Aggregate Library Project

When using a Project Library, the library must reside in the Gateway's scripting project.

Library Name

The name of the project library must start with shared, in lowercase. However there can be additional characters following the word shared. For example: shared_myLib.

Examples

Creating a Custom Function Script

  1. Add a project script, by right clicking the Project Library item in the Project Browser, and choosing the New Script option.

    Custom Tag History Aggregates Example Step 1

  2. Enter a name for the script and click Create Script (we named ours shared in the example).

  3. Enter the following code block:

    Example
    # this is a simple count function, called for each value in a time window
    def myCount(qval, interpolated, finished, blockContext, queryContext):
    cnt = blockContext.getOrDefault('cnt',0)
    if qval.quality.isGood():
    blockContext['cnt']=int(cnt)+1

    if finished:
    return blockContext.getOrDefault('cnt', 0)

    Custom Tag History Aggregates Example Step 3

    The custom function could be used by using the example below:

    Example
    #Return tag history using a custom aggregate function you wrote. 

    system.tag.queryTagHistory(paths=['MyTag'], rangeHours=1, aggregationModes=['shared.myCount'], returnSize = 100)

Creating an Aggregate Function on the Fly

Example
#Create a function on the fly to pass in as a custom aggregate.

wrapper = """\
python:def wrapper(qval, interpolated, finished, blockContext, queryContext):
return shared.aggregates.customFunction(qval, interpolated, finished, blockContext, queryContext)
"""
system.tag.queryTagHistory(paths=['MyTag'], rangeHours=1, aggregationModes=[wrapper], returnSize = 100)